Hi Samza community!

I'm trying to move from POC to a full speed production deployment and
trying to find answers to following questions:

1) What is the best way to handle partial outages. Let's say my yarn
cluster is deployed on amazon among 3 availability zones. Is there a way to
guarantee operability if I lose whole availability zone (one third of
capacity)? Will samza just restart failed containers on available nodes
(which means some downtime) or there is a way to have a passive task
instances that can take over? What will happen if master dies?
2) What is the best way of deploying new code? I'm especially interested in
how to deploy new tasks that maintain pretty big state without interrupting
streaming?
3) What is the good naming and versioning strategy for things like kafka
topics, RocksDB stores, etc
4) What is the best way of configuring jobs? Hello samza example bundles
configs with the tasks so all the urls are hidden inside a tar file. Is
there a better way to pass properties based on region, environment (prod,
test, dev), etc?
5) I faced a weird problem with kafka partitioning. I created 2 kafka queus
and 2 samza jobs that were communicating like:
topic1 -> samza1 -> topic2 -> samza2

samza2 had a state in rockDB (let's say itwas just storing strings it saw).
Kafka topics had 20 partitions. I found that messages that were send by
samza1 and manually using org.apache.kafka.clients.producer.KafkaProducer
were landing on different samza2 instancies even though they had same
partition key (of type string).

Example:
samza1 sending mesage with key "key1"  to samza2 via topic2, and it is
stored in task1 of samza2
I send messages manually to topic2 with key "key1" and it is stored in task
10 of samza2. Code that I was usign to send messages from samza1:

Config:

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.key.serde=string
systems.kafka.samza.msg.serde=string

Code:

private static final SystemStream OUTPUT_STREAM =
        new SystemStream("kafka", "topic2");

messageCollector.send(new OutgoingMessageEnvelope(
        OUTPUT_STREAM, "key1", message));


manually:

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, s"$broker:9092")
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)

val kafkaProducer = new KafkaProducer[String, String](configs)
val record = new ProducerRecord[String, String]("topic2", "key1", message)
kafkaProducer.send(record).get()

What can be wrong?

and one crazy question:
have anyone thought about combining samza and spark? Like allow spark to
use Samza's RocksDB/LevelDB storage as a state holder for micro batching?

Thank you,
Alex

Reply via email to