Hi, Alex, I apologize for the late reply. Let me try to give some feedbacks/comments below:
On Thu, Jan 7, 2016 at 3:59 PM, Alexander Filipchik <afilipc...@gmail.com> wrote: > > 1) What is the best way to handle partial outages. Let's say my yarn > cluster is deployed on amazon among 3 availability zones. Is there a way to > guarantee operability if I lose whole availability zone (one third of > capacity)? Will samza just restart failed containers on available nodes > (which means some downtime) or there is a way to have a passive task > instances that can take over? What will happen if master dies? > When a node dies in YARN, there are the following situations: a. the RM dies. W/o RM HA, the whole cluster will be unavailable in this case and has to rely on ops to restart the whole YARN cluster b. one NM dies. In this case, there could be two sub-cases: b.1 the NM only runs SamzaContainer; b.2 the NM runs SamzaAppMaster. In b.1, SamzaAppMaster will re-request a new container from RM and start the SamzaContainer in the new container. In b.2, the whole Samza job will fail and YARN RM will re-start the job again. As for now, there is no "passive" task instances that are standby. > 2) What is the best way of deploying new code? I'm especially interested in > how to deploy new tasks that maintain pretty big state without interrupting > streaming? > Since the configuration of Samza job is still immutable, right now the way to deploy new Samza code is still to re-push the binary and restart the job. It used to take long time if your job has big states. With host-affinity feature in Samza 0.10, the restarted Samza job will try the best to use the previous hosts to run the same containers and re-use the local state stores. In LinkedIn, we have test this feature with big stateful jobs and successfully cut-down the re-bootstrap time. > 3) What is the good naming and versioning strategy for things like kafka > topics, RocksDB stores, etc > Samza does not restrict the naming mechanism application chooses for Kafka topics and RocksDB stores. What makes sense and can uniquely identify state stores and application input/output streams in the deployment environment would be good enough. > 4) What is the best way of configuring jobs? Hello samza example bundles > configs with the tasks so all the urls are hidden inside a tar file. Is > there a better way to pass properties based on region, environment (prod, > test, dev), etc? > Based on different deployment system, the way to pass on the configuration can be very different. The property file based configuration is just one simple example we used in hello-samza example. It may not make sense to complex deployment environment. In LinkedIn, we package the binary and configuration in two different packages and LinkedIn's deployment system can identify the bundle of binary and configuration separately and deploy them to the target host's specific locations. Then, the start script will use the specific configuration location determined by the deployment system as the path to configuration when starting the Samza job. > 5) I faced a weird problem with kafka partitioning. I created 2 kafka queus > and 2 samza jobs that were communicating like: > topic1 -> samza1 -> topic2 -> samza2 > > samza2 had a state in rockDB (let's say itwas just storing strings it saw). > Kafka topics had 20 partitions. I found that messages that were send by > samza1 and manually using org.apache.kafka.clients.producer.KafkaProducer > were landing on different samza2 instancies even though they had same > partition key (of type string). > > Example: > samza1 sending mesage with key "key1" to samza2 via topic2, and it is > stored in task1 of samza2 > I send messages manually to topic2 with key "key1" and it is stored in task > 10 of samza2. Code that I was usign to send messages from samza1: > > Config: > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.key.serde=string > systems.kafka.samza.msg.serde=string > > Code: > > private static final SystemStream OUTPUT_STREAM = > new SystemStream("kafka", "topic2"); > > messageCollector.send(new OutgoingMessageEnvelope( > OUTPUT_STREAM, "key1", message)); > > > manually: > > configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, s"$broker:9092") > configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > classOf[StringSerializer].getName) > configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > classOf[StringSerializer].getName) > > val kafkaProducer = new KafkaProducer[String, String](configs) > val record = new ProducerRecord[String, String]("topic2", "key1", message) > kafkaProducer.send(record).get() > > What can be wrong? > > This might be related w/ SAMZA-839. > and one crazy question: > have anyone thought about combining samza and spark? Like allow spark to > use Samza's RocksDB/LevelDB storage as a state holder for micro batching? > > I think that a better question would be: can we implement micro-match (i.e. windowing) in Samza and provides RDDs to allow Spark Streaming programs to run on top of Samza? That's interesting thought, which allows unified programming model in both online and offline world. However, using micro-batch as in Spark Streaming APIs also introduce issues as disruptive session windows, out-of-order arrivals across boundaries e.t.c. We certainly can pound on it more. > Thank you, > Alex >