This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-seglo-kafka-sharding in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 34595d0d50bc3c0edf9dad25e9bd73153627c79e Author: Sean Glover <[email protected]> AuthorDate: Thu Feb 13 17:41:23 2020 -0500 WIP --- akka-sample-kafka-to-sharding-scala/README.md | 25 ++++++-- akka-sample-kafka-to-sharding-scala/build.sbt | 13 ++++- .../kafka/src/main/resources/logback.xml | 23 ++++++++ .../sharding/embeddedkafka/KafkaBroker.scala | 22 +++++++ .../processor/src/main/resources/application.conf | 3 +- .../akka/kafka/KafkaShardingMessageExtractor.scala | 67 ++++++++++++++++++++++ .../sample/sharding/kafka/ProcessorConfig.scala | 8 +-- .../sample/sharding/kafka/TopicListener.scala | 18 +++--- .../scala/sample/sharding/kafka/UserEvents.scala | 33 ++++++----- .../sharding/kafka/UserEventsKafkaProcessor.scala | 6 +- .../producer/src/main/resources/application.conf | 2 +- .../sharding/kafka/producer/ProducerConfig.scala | 19 +----- .../kafka/producer/UserEventProducer.scala | 19 +----- 13 files changed, 188 insertions(+), 70 deletions(-) diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md index af1433c..7194fe2 100644 --- a/akka-sample-kafka-to-sharding-scala/README.md +++ b/akka-sample-kafka-to-sharding-scala/README.md @@ -30,17 +30,32 @@ The sample is made up of three applications: * `processor` An Akka Cluster Sharding application that reads the Kafka topic and forwards the messages to a sharded entity that represents a user and a gRPC front end for accessing the sharded actors state * `client` A gRPC client for interacting with the cluster +* `kafka` A local Kafka server The sample demonstrates how the external shard allocation strategy can used so messages are processed locally. The sample depends on a Kafka broker running locally on port `9092` with a topic with 128 partitions called `user-events.` -[Kafka can be run in Docker](https://github.com/wurstmeister/kafka-docker) or run locally following [these instructions](https://kafka.apache.org/quickstart). +[Kafka can be run in Docker](https://github.com/wurstmeister/kafka-docker) or run locally using the optional `kafka` project. -Update the `applications.conf`s in each project to point to your Kafka broker if not running on `localhost:9092` +* Run the local Kafka server. This project will also create the `user-events` topic. + +``` +sbt "kafka / run" +``` + +In the Kafka server window you'll see the following when the server is ready: + +``` +12:46:47.022 INFO [run-main-0 ] sample.sharding.embeddedkafka.Main$ Kafka running on port '9092' +12:46:47.022 INFO [run-main-0 ] sample.sharding.embeddedkafka.Main$ Topic 'user-events' with '128' partitions created +``` + +If you want to use a different Kafka cluster then then update the `applications.conf`s in each project to point to your +Kafka broker if not running on `localhost:9092`. -* Create a topic with 128 partitions, or update application.conf with the desired number of - partitions e.g. a command from your Kafka installation: +* _(Optional)_ If you do not run the local Kafka server then you must create a topic with 128 partitions, or update + application.conf with the desired number of partitions e.g. a command from your Kafka installation: ``` bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 128 --topic user-events @@ -104,7 +119,7 @@ As there is only one node we get 100% locality, each forwarded message is proces Now let's see that remain true once we add more nodes to the Akka Cluster, add another with different ports: ``` - sbt "processor / run 2552 8552 8082" +sbt "processor / run 2552 8552 8082" ``` When this starts up we'll see Kafka assign partitions to the new node (it is in the same consumer group): diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt index 10f8cbb..5a08bb7 100644 --- a/akka-sample-kafka-to-sharding-scala/build.sbt +++ b/akka-sample-kafka-to-sharding-scala/build.sbt @@ -1,8 +1,8 @@ val AkkaVersion = "2.6.3" -// TODO upgrade to 2.0.0 -val AlpakkaKafkaVersion = "1.1.0" +val AlpakkaKafkaVersion = "2.0.1" val AkkaManagementVersion = "1.0.5" val AkkaHttpVersion = "10.1.11" +val KafkaVersion = "2.4.0" val LogbackVersion = "1.2.3" ThisBuild / scalaVersion := "2.13.1" @@ -22,6 +22,15 @@ Global / cancelable := true // ctrl-c lazy val `akka-sample-kafka-to-sharding` = project.in(file(".")).aggregate(producer, processor, client) +lazy val kafka = project + .in(file("kafka")) + .settings( + libraryDependencies ++= Seq( + "ch.qos.logback" % "logback-classic" % LogbackVersion, + "org.slf4j" % "log4j-over-slf4j" % "1.7.26", + "io.github.embeddedkafka" %% "embedded-kafka" % KafkaVersion), + cancelable := false) + lazy val client = project .in(file("client")) .enablePlugins(AkkaGrpcPlugin, JavaAgent) diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml b/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml new file mode 100644 index 0000000..c747bc5 --- /dev/null +++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml @@ -0,0 +1,23 @@ +<configuration> + <appender name="FILE" class="ch.qos.logback.core.FileAppender"> + <file>kafka/target/kafka.log</file> + <append>false</append> + <encoder> + <pattern>%d{ISO8601} %-5level [%-20.20thread] [%-36.36logger{36}] %msg%n%rEx</pattern> + </encoder> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} %-5level [%-20.20thread] %-36.36logger{36} %msg%n%rEx</pattern> + </encoder> + </appender> + + <logger name="sample.sharding.embeddedkafka" levle="INFO"> + <appender-ref ref="STDOUT"/> + </logger> + + <root level="INFO"> + <appender-ref ref="FILE" /> + </root> +</configuration> diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala new file mode 100644 index 0000000..90663a3 --- /dev/null +++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala @@ -0,0 +1,22 @@ +package sample.sharding.embeddedkafka + +import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import org.slf4j.LoggerFactory + +object KafkaBroker extends App with EmbeddedKafka { + val log = LoggerFactory.getLogger(this.getClass) + + val port = 9092 + val topic = "user-events" + val partitions = 128 + + val config = EmbeddedKafkaConfig(kafkaPort = port) + val server = EmbeddedKafka.start()(config) + + createCustomTopic(topic = topic, partitions = partitions) + + log.info(s"Kafka running on port '$port'") + log.info(s"Topic '$topic' with '$partitions' partitions created") + + server.broker.awaitShutdown() +} diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf index 76f1266..ef7e721 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf @@ -1,8 +1,7 @@ kafka-to-sharding-processor { bootstrap-servers = "localhost:9092" - topic = "user-events" + topics = ["user-events"] group = "group-1" - nr-partitions = 128 } akka.http { diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala new file mode 100644 index 0000000..f02eb41 --- /dev/null +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala @@ -0,0 +1,67 @@ +package akka.kafka + +import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor} +import akka.kafka.scaladsl.MetadataClient +import akka.util.Timeout._ +import org.apache.kafka.clients.producer.Partitioner +import org.apache.kafka.clients.producer.internals.DefaultPartitioner +import org.apache.kafka.common.{Node, PartitionInfo, Cluster => KafkaCluster} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContextExecutor} +import scala.jdk.CollectionConverters._ + +private[kafka] trait DefaultKafkaShardingMessageExtractor { + implicit val actorSystem: ActorSystem + implicit val timeout: FiniteDuration + implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher + + val clientSettings: ConsumerSettings[_, _] + val groupId: String + val topic: String + + private val CLUSTER_ID = "cluster-id" + private val kafkaPartitioner = partitioner() + private val kafkaCluster = cluster(partitions()) + + def shardId(entityId: String): String = { + val partition = kafkaPartitioner + .partition(topic, entityId, entityId.getBytes(), null, null, kafkaCluster) + s"$groupId-$partition" + } + + def partitions(): List[PartitionInfo] = { + val consumerActor = actorSystem.actorOf(KafkaConsumerActor.props(clientSettings), "metadata-consumer-actor") + val metadataClient = MetadataClient.create(consumerActor, timeout) + val partitions = metadataClient.getPartitionsFor(topic) + partitions.foreach(p => actorSystem.log.info("Retrieved %s partitions for topic %s for group %s", p.length, topic, groupId)) + Await.result(partitions, timeout) + } + + def cluster(partitions: List[PartitionInfo]): KafkaCluster = + new KafkaCluster(CLUSTER_ID, List.empty[Node].asJavaCollection, partitions.asJavaCollection, Set.empty[String].asJava, Set.empty[String].asJava) + + def partitioner(): Partitioner = new DefaultPartitioner() +} + +final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String) + (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration) + extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor { + override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId + override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message +} + +/** + * Caveats + * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions. + * - Values are passed as `null` to the partitioner. + * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that + * only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your + * partitioner doesn't make use of any other Kafka Cluster metadata. + */ +abstract class KafkaShardingNoEnvelopeExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String) + (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration) + extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor { + override def unwrapMessage(message: M): M = message +} diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala index fd07ac4..1749aee 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala @@ -1,14 +1,14 @@ package sample.sharding.kafka import com.typesafe.config.Config +import scala.jdk.CollectionConverters._ case object ProcessorConfig { def apply(config: Config): ProcessorConfig = new ProcessorConfig( config.getString("bootstrap-servers"), - config.getString("topic"), - config.getString("group"), - config.getInt("nr-partitions")) + config.getStringList("topics").asScala.toList, + config.getString("group")) } -final class ProcessorConfig(val bootstrapServers: String, val topic: String, val groupId: String, val nrPartitions: Int) +final class ProcessorConfig(val bootstrapServers: String, val topics: List[String], val groupId: String) diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala index ac62b4c..79be5e3 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala @@ -14,7 +14,7 @@ import scala.util.Failure import scala.util.Success object TopicListener { - def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] = + def apply(groupId: String, typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] = Behaviors.setup { ctx => import ctx.executionContext val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKey.name) @@ -28,21 +28,23 @@ object TopicListener { } val address = Cluster(ctx.system).selfMember.address Behaviors.receiveMessage[ConsumerRebalanceEvent] { - case TopicPartitionsAssigned(_, partitions) => - partitions.foreach(partition => { - ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", partition.partition()) + case TopicPartitionsAssigned(sub, partitions) => + partitions.foreach(tp => { + val shardId = s"$groupId-${tp.partition()}" + ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId) // kafka partition becomes the akka shard - val done = shardAllocationClient.updateShardLocation(partition.partition().toString, address) + val done = shardAllocationClient.updateShardLocation(shardId, address) done.onComplete { result => - ctx.log.info("Result for updating shard {}: {}", partition, result) + ctx.log.info("Result for updating shard {}: {}", shardId, result) } }) Behaviors.same case TopicPartitionsRevoked(_, topicPartitions) => ctx.log.info( - "Partitions [{}] revoked from current node. New location will update shard allocation", - topicPartitions.mkString(",")) + "Partitions [{}] of group [{}] revoked from current node. New location will update shard allocation", + topicPartitions.mkString(","), + groupId) Behaviors.same } } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala index f093fff..f0f4f90 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala @@ -1,16 +1,16 @@ package sample.sharding.kafka import akka.Done -import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem -import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.cluster.sharding.external.ExternalShardAllocationStrategy import akka.cluster.sharding.typed.ClusterShardingSettings -import akka.cluster.sharding.typed.Murmur2NoEnvelopeMessageExtractor -import akka.cluster.sharding.typed.scaladsl.ClusterSharding -import akka.cluster.sharding.typed.scaladsl.Entity -import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey} +import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor} +import org.apache.kafka.common.serialization.StringDeserializer + +import scala.concurrent.duration._ object UserEvents { @@ -54,20 +54,27 @@ object UserEvents { } /* - * The murmur2 message extractor matches kafka's default partitioning when messages - * have keys that are strings + * The KafkaShardingMessageExtractor uses the KafkaProducer's underlying DefaultPartitioner so that the same murmur2 + * hashing algorithm is used for Kafka and Akka Cluster Sharding */ - class UserIdMessageExtractor(nrKafkaPartitions: Int) - extends Murmur2NoEnvelopeMessageExtractor[Message](nrKafkaPartitions) { - override def entityId(message: Message): String = message.userId + class UserIdMessageExtractor(clientSettings: ConsumerSettings[_,_], topic: String, groupId: String) + (implicit actorSystem: akka.actor.ActorSystem, timeout: FiniteDuration) + extends KafkaShardingNoEnvelopeExtractor[Message](clientSettings, topic, groupId) { + def entityId(message: Message): String = message.userId } def init(system: ActorSystem[_]): ActorRef[Message] = { val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor")) + implicit val classic: akka.actor.ActorSystem = system.toClassic + implicit val timeout: FiniteDuration = 10.seconds + val clientSettings = ConsumerSettings(classic, new StringDeserializer, new StringDeserializer) + val topic = processorConfig.topics.head + val groupId = processorConfig.groupId + val messageExtractor = new UserIdMessageExtractor(clientSettings, topic, groupId) ClusterSharding(system).init( Entity(TypeKey)(createBehavior = _ => UserEvents()) .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name)) - .withMessageExtractor(new UserIdMessageExtractor(processorConfig.nrPartitions)) + .withMessageExtractor(messageExtractor) .withSettings(ClusterShardingSettings(system))) } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala index 8c69c83..511c3bc 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala @@ -39,7 +39,7 @@ object UserEventsKafkaProcessor { implicit val scheduler: Scheduler = classic.scheduler // TODO config val timeout = Timeout(3.seconds) - val rebalancerRef = ctx.spawn(TopicListener(UserEvents.TypeKey), "rebalancerRef") + val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef") val shardRegion = UserEvents.init(ctx.system) val consumerSettings = ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer) @@ -48,7 +48,9 @@ object UserEventsKafkaProcessor { .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .withStopTimeout(0.seconds) - val subscription = Subscriptions.topics(processorSettings.topic).withRebalanceListener(rebalancerRef.toClassic) + val subscription = Subscriptions + .topics(processorSettings.topics: _*) + .withRebalanceListener(rebalancerRef.toClassic) val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] = Consumer.plainSource(consumerSettings, subscription) diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf index 2025bb6..38d51c0 100644 --- a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf +++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf @@ -1,5 +1,5 @@ kafka-to-sharding-producer { - bootstrap-servers = "localhost:9092" + bootstrap-servers = "localhost:9094" topic = "user-events" # can be one of: diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala index f11b2b7..9d4c374 100644 --- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala +++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala @@ -2,28 +2,13 @@ package sharding.kafka.producer import com.typesafe.config.Config -sealed trait Partitioning -case object Default extends Partitioning -case object Explicit extends Partitioning - -object Partitioning { - def valueOf(input: String): Partitioning = input.toLowerCase match { - case "explicit" => Explicit - case _ => Default - } -} - case object ProducerConfig { def apply(config: Config): ProducerConfig = new ProducerConfig( config.getString("bootstrap-servers"), - config.getString("topic"), - Partitioning.valueOf(config.getString("partitioning")), - config.getInt("nr-partitions")) + config.getString("topic")) } final class ProducerConfig( val bootstrapServers: String, - val topic: String, - val partitioning: Partitioning, - val nrPartitions: Int) + val topic: String) diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala index c7d2869..846dee7 100644 --- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala +++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala @@ -52,23 +52,10 @@ object UserEventProducer extends App { val product = products(Random.nextInt(products.size)) val message = UserPurchaseProto(randomEntityId, product, quantity, price).toByteArray log.info("Sending message to user {}", randomEntityId) - producerRecord(randomEntityId, message) - - }) - .runWith(Producer.plainSink(producerSettings)) - - def producerRecord(entityId: String, message: Array[Byte]): ProducerRecord[String, Array[Byte]] = { - producerConfig.partitioning match { - case Default => // rely on the default kafka partitioner to hash the key and distribute among shards // the logic of the default partitioner must be replicated in MessageExtractor entityId -> shardId function - new ProducerRecord[String, Array[Byte]](producerConfig.topic, entityId, message) - case Explicit => - // this logic MUST be replicated in the MessageExtractor entityId -> shardId function! - val shardAndPartition = (Utils.toPositive(Utils.murmur2(entityId.getBytes(StandardCharsets.UTF_8))) % producerConfig.nrPartitions) - log.info(s"entityId->partition ${entityId}->${shardAndPartition}") - new ProducerRecord[String, Array[Byte]](producerConfig.topic, shardAndPartition, entityId, message) - } - } + new ProducerRecord[String, Array[Byte]](producerConfig.topic, randomEntityId, message) + }) + .runWith(Producer.plainSink(producerSettings)) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
