This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-seglo-kafka-sharding in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit fb0f3f28a5a8b5eb71658342d03dc0f77c5d78c9 Author: Sean Glover <[email protected]> AuthorDate: Fri Feb 14 15:58:05 2020 -0500 WIP- async message extractor --- akka-sample-kafka-to-sharding-scala/README.md | 8 +-- .../sharding/embeddedkafka/KafkaBroker.scala | 8 +-- .../scala/akka/kafka/KafkaClusterSharding.scala | 78 ++++++++++++++++++++ .../akka/kafka/KafkaShardingMessageExtractor.scala | 75 ------------------- .../main/scala/sample/sharding/kafka/Main.scala | 84 ++++++++++++++-------- .../sample/sharding/kafka/TopicListener.scala | 3 + .../scala/sample/sharding/kafka/UserEvents.scala | 48 ++++++------- .../sharding/kafka/UserEventsKafkaProcessor.scala | 5 +- .../sample/sharding/kafka/UserGrpcService.scala | 5 +- .../producer/src/main/resources/application.conf | 11 +-- .../kafka/producer/UserEventProducer.scala | 7 +- 11 files changed, 174 insertions(+), 158 deletions(-) diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md index 7194fe2..4913792 100644 --- a/akka-sample-kafka-to-sharding-scala/README.md +++ b/akka-sample-kafka-to-sharding-scala/README.md @@ -154,16 +154,16 @@ Using Akka management we can see the shard allocations and the number of entitie ``` // Node 1: - curl -v localhost:8551/cluster/shards/user-processing | jq + curl -v localhost:8551/cluster/shards/user-processing | jq // Node 2: - curl -v localhost:8552/cluster/shards/user-processing | jq + curl -v localhost:8552/cluster/shards/user-processing | jq ``` We can count the number of shards on each: ``` -curl -v localhost:8551/cluster/shards/user-processing | jq -r "." | grep shardId | wc +curl -s localhost:8551/cluster/shards/user-processing | jq -r "." | grep shardId | wc -l ``` The number of shards will depend on which entities have received messages. @@ -181,7 +181,7 @@ the correct node even if that moves due to a kafka rebalance. A gRPC client is included which can be started with... ``` - sbt "client/run" +sbt "client/run" ``` It assumes there is one of the nodes running its front end port on 8081. diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala index 90663a3..7e5c81c 100644 --- a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala +++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala @@ -10,13 +10,13 @@ object KafkaBroker extends App with EmbeddedKafka { val topic = "user-events" val partitions = 128 - val config = EmbeddedKafkaConfig(kafkaPort = port) - val server = EmbeddedKafka.start()(config) + implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = port) + val server = EmbeddedKafka.start() createCustomTopic(topic = topic, partitions = partitions) - log.info(s"Kafka running on port '$port'") - log.info(s"Topic '$topic' with '$partitions' partitions created") + log.info(s"Kafka running: localhost:$port") + log.info(s"Topic '$topic' with $partitions partitions created") server.broker.awaitShutdown() } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala new file mode 100644 index 0000000..5562c42 --- /dev/null +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala @@ -0,0 +1,78 @@ +package akka.kafka + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor} +import akka.kafka.scaladsl.MetadataClient +import akka.util.Timeout._ +import org.apache.kafka.common.utils.Utils + +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.concurrent.duration._ + +object KafkaClusterSharding { + private val metadataActorCounter = new AtomicInteger + + def messageExtractor[M](system: ActorSystem, + groupId: String, + topic: String, + timeout: FiniteDuration, + settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = + getPartitionCount(system, topic, timeout, settings) + .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](groupId, kafkaPartitions))(system.dispatcher) + + def messageExtractorNoEnvelope[M](system: ActorSystem, + groupId: String, + topic: String, + timeout: FiniteDuration, + entityIdExtractor: M => String, + settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] = + getPartitionCount(system, topic, timeout, settings) + .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](groupId, kafkaPartitions, entityIdExtractor))(system.dispatcher) + + private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = { + implicit val ec: ExecutionContextExecutor = system.dispatcher + val actorNum = metadataActorCounter.getAndIncrement() + val consumerActor = system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum") + val metadataClient = MetadataClient.create(consumerActor, timeout) + val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length) + numPartitions.map { count => + system.log.info("Retrieved {} partitions for topic '{}'", count, topic) + count + } + } +} + +trait KafkaClusterSharding { + def groupId: String + def kafkaPartitions: Int + + def shardId(entityId: String): String = { + // simplified version of Kafka's `DefaultPartitioner` implementation + val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions + s"$groupId-$partition" + } +} + +class KafkaShardingMessageExtractor[M](val groupId: String, val kafkaPartitions: Int) + extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding { + override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId + override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message +} + +/** + * Caveats + * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions. + * - Values are passed as `null` to the partitioner. + * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that + * only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your + * partitioner doesn't make use of any other Kafka Cluster metadata. + */ +class KafkaShardingNoEnvelopeExtractor[M](val groupId: String, val kafkaPartitions: Int, entityIdExtractor: M => String) + extends ShardingMessageExtractor[M, M] with KafkaClusterSharding { + override def entityId(message: M): String = entityIdExtractor(message) + override def unwrapMessage(message: M): M = message +} diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala deleted file mode 100644 index 14353ab..0000000 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala +++ /dev/null @@ -1,75 +0,0 @@ -package akka.kafka - -import java.util.concurrent.atomic.AtomicInteger - -import akka.actor.{ActorSystem, ExtendedActorSystem} -import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor} -import akka.kafka.DefaultKafkaShardingMessageExtractor.PartitionCountStrategy -import akka.kafka.scaladsl.MetadataClient -import akka.util.Timeout._ -import org.apache.kafka.common.utils.Utils - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext} - -object DefaultKafkaShardingMessageExtractor { - sealed trait PartitionCountStrategy { - def groupId: String - def partitions: Int - } - final case class Provided(groupId: String, partitions: Int) extends PartitionCountStrategy - final case class RetrieveFromKafka( - system: ActorSystem, - timeout: FiniteDuration, - groupId: String, - topic: String, - settings: ConsumerSettings[_,_]) - extends PartitionCountStrategy { - import RetrieveFromKafka._ - private implicit val ec: ExecutionContext = system.dispatcher - lazy val partitions: Int = { - val actorNum = metadataActorCounter.getAndIncrement() - val consumerActor = system - .asInstanceOf[ExtendedActorSystem] - .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum") - val metadataClient = MetadataClient.create(consumerActor, timeout) - val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length) - numPartitions.foreach(num => system.log.info("Retrieved {} partitions for topic '{}' for group '{}'", num, topic, groupId)) - Await.result(numPartitions, timeout) - } - } - object RetrieveFromKafka { - private val metadataActorCounter = new AtomicInteger - } -} - -private[kafka] trait DefaultKafkaShardingMessageExtractor { - val strategy: PartitionCountStrategy - private val groupId: String = strategy.groupId - private val kafkaPartitions: Int = strategy.partitions - - def shardId(entityId: String): String = { - // simplified version of Kafka's `DefaultPartitioner` implementation - val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions - s"$groupId-$partition" - } -} - -final class KafkaShardingMessageExtractor[M](val strategy: PartitionCountStrategy) - extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor { - override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId - override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message -} - -/** - * Caveats - * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions. - * - Values are passed as `null` to the partitioner. - * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that - * only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your - * partitioner doesn't make use of any other Kafka Cluster metadata. - */ -abstract class KafkaShardingNoEnvelopeExtractor[M](val strategy: PartitionCountStrategy) - extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor { - override def unwrapMessage(message: M): M = message -} diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala index 96fce26..a695853 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala @@ -1,21 +1,24 @@ package sample.sharding.kafka -import akka.actor.typed.ActorSystem -import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.{ActorRef, ActorSystem, Behavior, Terminated} import akka.cluster.ClusterEvent.MemberUp -import akka.cluster.typed.Cluster -import akka.cluster.typed.Subscribe +import akka.cluster.sharding.typed.ShardingMessageExtractor +import akka.cluster.typed.{Cluster, Subscribe} import akka.http.scaladsl._ -import akka.http.scaladsl.model.HttpRequest -import akka.http.scaladsl.model.HttpResponse +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.management.scaladsl.AkkaManagement import akka.stream.Materializer -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory} import scala.concurrent.Future +import scala.util.{Failure, Success} + +sealed trait Command +case object NodeMemberUp extends Command +case object StartProcessor extends Command +final case class MessageExtractor(strategy: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command object Main { def main(args: Array[String]): Unit = { @@ -25,45 +28,69 @@ object Main { args.toList match { case portString :: managementPort :: frontEndPort :: Nil if isInt(portString) && isInt(managementPort) && isInt(frontEndPort) => - startNode(portString.toInt, managementPort.toInt, frontEndPort.toInt) + init(portString.toInt, managementPort.toInt, frontEndPort.toInt) case _ => throw new IllegalArgumentException("usage: <remotingPort> <managementPort> <frontEndPort>") } } - def startNode(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = { - ActorSystem(Behaviors.setup[MemberUp] { + def init(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = { + ActorSystem(Behaviors.setup[Command] { ctx => - implicit val mat = Materializer.createMaterializer(ctx.system.toClassic) - implicit val ec = ctx.executionContext AkkaManagement(ctx.system.toClassic).start() - // maybe don't start until part of the cluster, or add health check - val binding = startGrpc(ctx.system, mat, frontEndPort) + val cluster = Cluster(ctx.system) - cluster.subscriptions.tell(Subscribe(ctx.self, classOf[MemberUp])) + val subscriber = ctx.spawn(clusterUpSubscriber(cluster, ctx.self), "cluster-subscriber") + cluster.subscriptions.tell(Subscribe(subscriber, classOf[MemberUp])) + + ctx.pipeToSelf(UserEvents.messageExtractor(ctx.system)) { + case Success(extractor) => MessageExtractor(extractor) + case Failure(ex) => throw new Exception(ex) + } + + starting() + }, "KafkaToSharding", config(remotingPort, akkaManagementPort)) + + def starting(extractor: Option[ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]] = None): Behavior[Command] = Behaviors + .receive[Command] { + case (ctx, MessageExtractor(extractor)) => + ctx.self.tell(StartProcessor) + starting(Some(extractor)) + case (ctx, StartProcessor) if extractor.isDefined => + UserEvents.init(ctx.system, extractor.get) + val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(extractor.get), "kafka-event-processor") + ctx.watch(eventProcessor) + ctx.log.info("Processor started.") + val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, extractor.get) + running(binding, eventProcessor) + } + + def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors + .receiveSignal { + case (ctx, Terminated(`processor`)) => + ctx.log.warn("Kafka event processor stopped. Shutting down") + binding.map(_.unbind())(ctx.executionContext) + Behaviors.stopped + } + + def clusterUpSubscriber(cluster: Cluster, parent: ActorRef[Command]): Behavior[MemberUp] = Behaviors.setup[MemberUp] { + ctx => Behaviors .receiveMessage[MemberUp] { case MemberUp(member) if member.uniqueAddress == cluster.selfMember.uniqueAddress => ctx.log.info("Joined the cluster. Starting sharding and kafka processor") - UserEvents.init(ctx.system) - val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(), "kafka-event-processor") - ctx.watch(eventProcessor) + parent.tell(StartProcessor) Behaviors.same case MemberUp(member) => ctx.log.info("Member up {}", member) Behaviors.same } - .receiveSignal { - case (ctx, Terminated(_)) => - ctx.log.warn("Kafka event processor stopped. Shutting down") - binding.map(_.unbind()) - Behaviors.stopped - } - }, "KafkaToSharding", config(remotingPort, akkaManagementPort)) + } - def startGrpc(system: ActorSystem[_], mat: Materializer, frontEndPort: Int): Future[Http.ServerBinding] = { + def startGrpc(system: ActorSystem[_], frontEndPort: Int, extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]): Future[Http.ServerBinding] = { + val mat = Materializer.createMaterializer(system.toClassic) val service: HttpRequest => Future[HttpResponse] = - UserServiceHandler(new UserGrpcService(system))(mat, system.toClassic) + UserServiceHandler(new UserGrpcService(system, extractor))(mat, system.toClassic) Http()(system.toClassic).bindAndHandleAsync( service, interface = "127.0.0.1", @@ -79,5 +106,4 @@ object Main { """).withFallback(ConfigFactory.load()) } - } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala index 79be5e3..3b59790 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala @@ -29,6 +29,9 @@ object TopicListener { val address = Cluster(ctx.system).selfMember.address Behaviors.receiveMessage[ConsumerRebalanceEvent] { case TopicPartitionsAssigned(sub, partitions) => + // TODO + // - log all partitions assigned in one log line + // - block for shard allocation to complete, add configurable timeout partitions.foreach(tp => { val shardId = s"$groupId-${tp.partition()}" ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId) diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala index 5cf3321..69d815d 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala @@ -5,12 +5,12 @@ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.cluster.sharding.external.ExternalShardAllocationStrategy -import akka.cluster.sharding.typed.ClusterShardingSettings +import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardingMessageExtractor} import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey} -import akka.kafka.DefaultKafkaShardingMessageExtractor.{PartitionCountStrategy, RetrieveFromKafka} -import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor} +import akka.kafka.{ConsumerSettings, KafkaClusterSharding, KafkaShardingNoEnvelopeExtractor} import org.apache.kafka.common.serialization.StringDeserializer +import scala.concurrent.Future import scala.concurrent.duration._ object UserEvents { @@ -47,7 +47,8 @@ object UserEvents { runningTotal.copy( totalPurchases = runningTotal.totalPurchases + 1, amountSpent = runningTotal.amountSpent + (quantity * price))) - case GetRunningTotal(_, replyTo) => + case GetRunningTotal(id, replyTo) => + ctx.log.info("user {} running total queried", id) replyTo ! runningTotal Behaviors.same } @@ -55,35 +56,30 @@ object UserEvents { } /** - * Passing in a [[RetrieveFromKafka]] strategy will automatically retrieve the number of partitions of a topic for - * use with the same hashing algorithm used by the Apache Kafka [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] - * (murmur2) with Akka Cluster Sharding. + * Asynchronously get the Akka Cluster Sharding [[ShardingMessageExtractor]]. Given a topic we can automatically + * retrieve the number of partitions and use the same hashing algorithm used by the Apache Kafka + * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] (murmur2) with Akka Cluster Sharding. */ - class UserIdMessageExtractor(strategy: PartitionCountStrategy) - extends KafkaShardingNoEnvelopeExtractor[Message](strategy) { - def entityId(message: Message): String = message.userId - } - - def init(system: ActorSystem[_]): ActorRef[Message] = { + def messageExtractor(system: ActorSystem[_]): Future[KafkaShardingNoEnvelopeExtractor[Message]] = { val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor")) - val messageExtractor = new UserIdMessageExtractor( - strategy = RetrieveFromKafka( - system = system.toClassic, - timeout = 10.seconds, - groupId = processorConfig.groupId, - topic = processorConfig.topics.head, - settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer) - .withBootstrapServers(processorConfig.bootstrapServers) - ) + KafkaClusterSharding.messageExtractorNoEnvelope( + system = system.toClassic, + timeout = 10.seconds, + groupId = processorConfig.groupId, + topic = processorConfig.topics.head, + entityIdExtractor = (msg: Message) => msg.userId, + settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer) + .withBootstrapServers(processorConfig.bootstrapServers) ) + } + + def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[Message] = ClusterSharding(system).init( Entity(TypeKey)(createBehavior = _ => UserEvents()) .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name)) .withMessageExtractor(messageExtractor) .withSettings(ClusterShardingSettings(system))) - } - def querySide(system: ActorSystem[_]): ActorRef[UserQuery] = { - init(system).narrow[UserQuery] - } + def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[UserQuery] = + init(system, messageExtractor).narrow[UserQuery] } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala index 511c3bc..155a51c 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala @@ -8,6 +8,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ +import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.kafka.ConsumerSettings import akka.kafka.Subscriptions import akka.kafka.scaladsl.Consumer @@ -30,7 +31,7 @@ object UserEventsKafkaProcessor { sealed trait Command private case class KafkaConsumerStopped(reason: Try[Any]) extends Command - def apply(): Behavior[Nothing] = { + def apply(extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]): Behavior[Nothing] = { Behaviors .setup[Command] { ctx => val processorSettings = ProcessorConfig(ctx.system.settings.config.getConfig("kafka-to-sharding-processor")) @@ -40,7 +41,7 @@ object UserEventsKafkaProcessor { // TODO config val timeout = Timeout(3.seconds) val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef") - val shardRegion = UserEvents.init(ctx.system) + val shardRegion = UserEvents.init(ctx.system, extractor) val consumerSettings = ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer) .withBootstrapServers(processorSettings.bootstrapServers) diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala index adfaf67..f3acb7c 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala @@ -3,6 +3,7 @@ package sample.sharding.kafka import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.AskPattern._ +import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.util.Timeout import sample.sharding.kafka.UserEvents.GetRunningTotal import sample.sharding.kafka.UserEvents.RunningTotal @@ -10,13 +11,13 @@ import sample.sharding.kafka.UserEvents.RunningTotal import scala.concurrent.Future import scala.concurrent.duration._ -class UserGrpcService(system: ActorSystem[_]) extends UserService { +class UserGrpcService(system: ActorSystem[_], extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends UserService { implicit val timeout = Timeout(5.seconds) implicit val sched = system.scheduler implicit val ec = system.executionContext - private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system) + private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor) override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = { shardRegion diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf index 38d51c0..0c08ab1 100644 --- a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf +++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf @@ -1,13 +1,4 @@ kafka-to-sharding-producer { - bootstrap-servers = "localhost:9094" + bootstrap-servers = "localhost:9092" topic = "user-events" - - # can be one of: - # - default: to put the entity id in the key and use the default kafka partitioner - # - explicit: to specify the partition explicitly, based on the entity id - # if you have control of both the producer and consumer it is better to be explicit to make sure - # that both sides align - partitioning = "explicit" - - nr-partitions = 128 } diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala index 846dee7..a20418c 100644 --- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala +++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala @@ -1,7 +1,5 @@ package sharding.kafka.producer -import java.nio.charset.StandardCharsets - import akka.Done import akka.actor.ActorSystem import akka.event.Logging @@ -10,9 +8,7 @@ import akka.kafka.scaladsl.Producer import akka.stream.scaladsl.Source import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.serialization.StringSerializer -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer} import sample.sharding.kafka.serialization.user_events.UserPurchaseProto import scala.concurrent.Future @@ -55,7 +51,6 @@ object UserEventProducer extends App { // rely on the default kafka partitioner to hash the key and distribute among shards // the logic of the default partitioner must be replicated in MessageExtractor entityId -> shardId function new ProducerRecord[String, Array[Byte]](producerConfig.topic, randomEntityId, message) - }) .runWith(Producer.plainSink(producerSettings)) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
