This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-seglo-kafka-sharding in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 47869c4c264a033049fd9833ea57ffceb5ba1bf0 Author: Sean Glover <[email protected]> AuthorDate: Wed Feb 19 14:58:44 2020 -0500 Return rebalance listener as classic ActorRef --- akka-sample-kafka-to-sharding-scala/README.md | 12 ++- .../scala/akka/kafka/KafkaClusterSharding.scala | 106 ++++++++++++--------- .../sharding/kafka/UserEventsKafkaProcessor.scala | 27 +++--- 3 files changed, 81 insertions(+), 64 deletions(-) diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md index fbbaf7d..20ccf8b 100644 --- a/akka-sample-kafka-to-sharding-scala/README.md +++ b/akka-sample-kafka-to-sharding-scala/README.md @@ -152,18 +152,20 @@ Each forwarding messaging is followed by log for the same entity on the current Using Akka management we can see the shard allocations and the number of entities per shard (uses `curl` and `jq`): ``` +# Node 1: +curl -v localhost:8551/cluster/shards/user-processing | jq -// Node 1: - curl -v localhost:8551/cluster/shards/user-processing | jq - -// Node 2: - curl -v localhost:8552/cluster/shards/user-processing | jq +# Node 2: +curl -v localhost:8552/cluster/shards/user-processing | jq ``` We can count the number of shards on each: ``` +# Node 1: curl -s localhost:8551/cluster/shards/user-processing | jq -r "." | grep shardId | wc -l +# Node 2: +curl -s localhost:8552/cluster/shards/user-processing | jq -r "." | grep shardId | wc -l ``` The number of shards will depend on which entities have received messages. diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala index 5bbd885..2fbaa7f 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala @@ -3,8 +3,10 @@ package akka.kafka import java.util.concurrent.atomic.AtomicInteger import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.Behaviors -import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} +import akka.annotation.{ApiMayChange, InternalApi} import akka.cluster.sharding.external.ExternalShardAllocation import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor} @@ -24,6 +26,8 @@ object KafkaClusterSharding { private val metadataActorCounter = new AtomicInteger /** + * API MAY CHANGE + * * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. * @@ -31,9 +35,10 @@ object KafkaClusterSharding { * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure * the Kafka Consumer connection required to retrieve the number of partitions. * - * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure + * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure * that entities are routed to the same Entity type. */ + @ApiMayChange def messageExtractor[M](system: ActorSystem, topic: String, timeout: FiniteDuration, @@ -42,6 +47,8 @@ object KafkaClusterSharding { .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher) /** + * API MAY CHANGE + * * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. * @@ -50,9 +57,10 @@ object KafkaClusterSharding { * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick * a field from the Entity to use as the entity id for the hashing strategy. * - * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure + * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure * that entities are routed to the same Entity type. */ + @ApiMayChange def messageExtractorNoEnvelope[M](system: ActorSystem, topic: String, timeout: FiniteDuration, @@ -75,6 +83,7 @@ object KafkaClusterSharding { } } + @InternalApi sealed trait KafkaClusterSharding { def kafkaPartitions: Int def shardId(entityId: String): String = { @@ -84,62 +93,69 @@ object KafkaClusterSharding { } } - final class KafkaShardingMessageExtractor[M](val kafkaPartitions: Int) + @InternalApi + final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int) extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding { override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message } - final class KafkaShardingNoEnvelopeExtractor[M](val kafkaPartitions: Int, entityIdExtractor: M => String) + @InternalApi + final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String) extends ShardingMessageExtractor[M, M] with KafkaClusterSharding { override def entityId(message: M): String = entityIdExtractor(message) override def unwrapMessage(message: M): M = message } - // TODO: - // - will require `akka-actors-typed` as another provided dep, or should we just return a classic actor? - // - returning a typed actor is more flexible for the user so that they can easy create it under the `user` guardian - // when running akka typed. an alternative would be to create the actor ourself as a system actor, like is done with - // the KafkaConsumerActor for the metadata client. + // TODO: will require `akka-actors-typed` as a provided dep /** - * The [[RebalanceListener]] handles [[TopicPartitionsAssigned]] events created by the Kafka consumer group rebalance - * listener. As partitions are assigned to this consumer group member we update the External Sharding strategy + * API MAY CHANGE + * + * Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is used + * to create the [[ExternalShardAllocation]] client. When partitions are assigned to this consumer group member the + * rebalance listener will use the [[ExternalShardAllocation]] client to update the External Sharding strategy * accordingly so that entities are (eventually) routed to the local Akka cluster member. + * + * Returns an Akka classic [[ActorRef]] that can be passed to an Alpakka Kafka [[ConsumerSettings]]. */ - object RebalanceListener { - def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] = - Behaviors.setup { ctx => - val typeKeyName = typeKey.name - val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName) - val address = Cluster(ctx.system).selfMember.address - Behaviors.receive[ConsumerRebalanceEvent] { - case (ctx, TopicPartitionsAssigned(_, partitions)) => - import ctx.executionContext - val partitionsList = partitions.mkString(",") - ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]", - typeKeyName, address, partitionsList) - val updates = partitions.map { tp => - val shardId = tp.partition().toString - // Kafka partition number becomes the akka shard id - // TODO: support assigning more than 1 shard id at once? - shardAllocationClient.updateShardLocation(shardId, address) + @ApiMayChange + def rebalanceListener(system: ActorSystem, typeKey: EntityTypeKey[_]): ActorRef = { + val actor: Behavior[ConsumerRebalanceEvent] = Behaviors.setup { ctx => + val typeKeyName = typeKey.name + val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName) + val address = Cluster(ctx.system).selfMember.address + Behaviors.receive[ConsumerRebalanceEvent] { + case (ctx, TopicPartitionsAssigned(_, partitions)) => + import ctx.executionContext + val partitionsList = partitions.mkString(",") + ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]", + typeKeyName, address, partitionsList) + val updates = partitions.map { tp => + val shardId = tp.partition().toString + // Kafka partition number becomes the akka shard id + // TODO: support assigning more than 1 shard id at once? + shardAllocationClient.updateShardLocation(shardId, address) + } + Future + .sequence(updates) + // Each Future returns successfully once a majority of cluster nodes receive the update. There's no point + // blocking here because the rebalance listener is triggered asynchronously. If we want to block during + // rebalance then we should provide an implementation using the `PartitionAssignmentHandler` instead + .onComplete { + case Success(_) => + ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]", + typeKeyName, address, partitionsList) + case Failure(ex) => + ctx.log.error("A failure occurred while updating cluster shards", ex) } - // TODO: pipeToSelf since we're closing over local state? - Future - .sequence(updates) - // each Future returns successfully once a majority of cluster nodes receive the update. - // there's no point blocking here because the rebalance listener is triggered asynchronously. if we want - // to block rebalances then we should provide an implementing using the `PartitionAssignmentHandler` instead - .onComplete { - case Success(_) => - ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]", - typeKeyName, address, partitionsList) - case Failure(ex) => - ctx.log.error("A failure occurred while updating cluster shards", ex) - } - Behaviors.same - case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same - } + Behaviors.same + case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same } + } + + system + .toTyped + .systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener") + .toClassic } } \ No newline at end of file diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala index 941f177..43ad4bb 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala @@ -10,10 +10,9 @@ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.cluster.sharding.typed.scaladsl.EntityTypeKey -import akka.kafka.{ConsumerSettings, KafkaClusterSharding, Subscriptions} -import akka.kafka.scaladsl.Consumer -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source +import akka.kafka.{CommitterSettings, ConsumerMessage, ConsumerSettings, KafkaClusterSharding, Subscriptions} +import akka.kafka.scaladsl.{Committer, Consumer} +import akka.stream.scaladsl.SourceWithContext import akka.util.Timeout import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord @@ -41,10 +40,10 @@ object UserEventsKafkaProcessor { // TODO config val timeout = Timeout(3.seconds) val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId) - val rebalanceListener = ctx.spawn(KafkaClusterSharding.RebalanceListener(typeKey), "kafka-cluster-sharding-rebalance-listener") + val rebalanceListener = KafkaClusterSharding.rebalanceListener(classic, typeKey) val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId) val consumerSettings = - ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer) + ConsumerSettings(classic, new StringDeserializer, new ByteArrayDeserializer) .withBootstrapServers(processorSettings.bootstrapServers) .withGroupId(processorSettings.groupId) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") @@ -52,12 +51,11 @@ object UserEventsKafkaProcessor { val subscription = Subscriptions .topics(processorSettings.topics: _*) - .withRebalanceListener(rebalanceListener.toClassic) + .withRebalanceListener(rebalanceListener) - val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] = - Consumer.plainSource(consumerSettings, subscription) + val kafkaConsumer: SourceWithContext[ConsumerRecord[String, Array[Byte]], ConsumerMessage.CommittableOffset, Consumer.Control] = + Consumer.sourceWithOffsetContext(consumerSettings, subscription) - // TODO use committable source and reliable delivery (once released)? val stream: Future[Done] = kafkaConsumer .log("kafka-consumer") .filter(_.key() != null) // no entity id @@ -65,7 +63,7 @@ object UserEventsKafkaProcessor { // alternatively the user id could be in the message rather than use the kafka key ctx.log.info(s"entityId->partition ${record.key()}->${record.partition()}") ctx.log.info("Forwarding message for entity {} to cluster sharding", record.key()) - // idempotency? + // TODO idempotency? reliable delivery (once released)? retry( () => shardRegion.ask[Done](replyTo => { @@ -77,10 +75,11 @@ object UserEventsKafkaProcessor { purchaseProto.price, replyTo) })(timeout, ctx.system.scheduler), - 3, - 1.second) + attempts = 3, + delay = 1.second + ) } - .runWith(Sink.ignore) + .runWith(Committer.sinkWithOffsetContext(CommitterSettings(classic))) stream.onComplete { result => ctx.self ! KafkaConsumerStopped(result) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
