This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 47869c4c264a033049fd9833ea57ffceb5ba1bf0
Author: Sean Glover <[email protected]>
AuthorDate: Wed Feb 19 14:58:44 2020 -0500

    Return rebalance listener as classic ActorRef
---
 akka-sample-kafka-to-sharding-scala/README.md      |  12 ++-
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 106 ++++++++++++---------
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |  27 +++---
 3 files changed, 81 insertions(+), 64 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md 
b/akka-sample-kafka-to-sharding-scala/README.md
index fbbaf7d..20ccf8b 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -152,18 +152,20 @@ Each forwarding messaging is followed by log for the same 
entity on the current
 Using Akka management we can see the shard allocations and the number of 
entities per shard (uses `curl` and `jq`):
 
 ```
+# Node 1:
+curl -v localhost:8551/cluster/shards/user-processing | jq
 
-// Node 1:
- curl -v localhost:8551/cluster/shards/user-processing | jq
-
-// Node 2:
- curl -v localhost:8552/cluster/shards/user-processing | jq
+# Node 2:
+curl -v localhost:8552/cluster/shards/user-processing | jq
 ```
 
 We can count the number of shards on each:
 
 ```
+# Node 1:
 curl -s localhost:8551/cluster/shards/user-processing | jq -r "." | grep 
shardId | wc -l
+# Node 2:
+curl -s localhost:8552/cluster/shards/user-processing | jq -r "." | grep 
shardId | wc -l
 ```
 
 The number of shards will depend on which entities have received messages.
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
index 5bbd885..2fbaa7f 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -3,8 +3,10 @@ package akka.kafka
 import java.util.concurrent.atomic.AtomicInteger
 
 import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.adapter._
 import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.annotation.{ApiMayChange, InternalApi}
 import akka.cluster.sharding.external.ExternalShardAllocation
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
 import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
@@ -24,6 +26,8 @@ object KafkaClusterSharding {
   private val metadataActorCounter = new AtomicInteger
 
   /**
+   * API MAY CHANGE
+   *
    * Asynchronously return a [[ShardingMessageExtractor]] with a default 
hashing strategy based on Apache Kafka's
    * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
    *
@@ -31,9 +35,10 @@ object KafkaClusterSharding {
    * cluster for the number of partitions of a user provided [[topic]]. Use 
the [[settings]] parameter to configure
    * the Kafka Consumer connection required to retrieve the number of 
partitions.
    *
-   * _Important_: All topics used in a Consumer [[Subscription]] must contain 
the same number of partitions to ensure
+   * All topics used in a Consumer [[Subscription]] must contain the same 
number of partitions to ensure
    * that entities are routed to the same Entity type.
    */
+  @ApiMayChange
   def messageExtractor[M](system: ActorSystem,
                           topic: String,
                           timeout: FiniteDuration,
@@ -42,6 +47,8 @@ object KafkaClusterSharding {
       .map(kafkaPartitions => new 
KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher)
 
   /**
+   * API MAY CHANGE
+   *
    * Asynchronously return a [[ShardingMessageExtractor]] with a default 
hashing strategy based on Apache Kafka's
    * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
    *
@@ -50,9 +57,10 @@ object KafkaClusterSharding {
    * the Kafka Consumer connection required to retrieve the number of 
partitions. Use the [[entityIdExtractor]] to pick
    * a field from the Entity to use as the entity id for the hashing strategy.
    *
-   * _Important_: All topics used in a Consumer [[Subscription]] must contain 
the same number of partitions to ensure
+   * All topics used in a Consumer [[Subscription]] must contain the same 
number of partitions to ensure
    * that entities are routed to the same Entity type.
    */
+  @ApiMayChange
   def messageExtractorNoEnvelope[M](system: ActorSystem,
                                     topic: String,
                                     timeout: FiniteDuration,
@@ -75,6 +83,7 @@ object KafkaClusterSharding {
     }
   }
 
+  @InternalApi
   sealed trait KafkaClusterSharding {
     def kafkaPartitions: Int
     def shardId(entityId: String): String = {
@@ -84,62 +93,69 @@ object KafkaClusterSharding {
     }
   }
 
-  final class KafkaShardingMessageExtractor[M](val kafkaPartitions: Int)
+  @InternalApi
+  final class KafkaShardingMessageExtractor[M] private[kafka](val 
kafkaPartitions: Int)
     extends ShardingMessageExtractor[ShardingEnvelope[M], M] with 
KafkaClusterSharding {
     override def entityId(envelope: ShardingEnvelope[M]): String = 
envelope.entityId
     override def unwrapMessage(envelope: ShardingEnvelope[M]): M = 
envelope.message
   }
 
-  final class KafkaShardingNoEnvelopeExtractor[M](val kafkaPartitions: Int, 
entityIdExtractor: M => String)
+  @InternalApi
+  final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val 
kafkaPartitions: Int, entityIdExtractor: M => String)
     extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
     override def entityId(message: M): String = entityIdExtractor(message)
     override def unwrapMessage(message: M): M = message
   }
 
-  // TODO:
-  // - will require `akka-actors-typed` as another provided dep, or should we 
just return a classic actor?
-  // - returning a typed actor is more flexible for the user so that they can 
easy create it under the `user` guardian
-  //   when running akka typed. an alternative would be to create the actor 
ourself as a system actor, like is done with
-  //   the KafkaConsumerActor for the metadata client.
+  // TODO: will require `akka-actors-typed` as a provided dep
   /**
-   * The [[RebalanceListener]] handles [[TopicPartitionsAssigned]] events 
created by the Kafka consumer group rebalance
-   * listener. As partitions are assigned to this consumer group member we 
update the External Sharding strategy
+   * API MAY CHANGE
+   *
+   * Create an Alpakka Kafka rebalance listener that handles 
[[TopicPartitionsAssigned]] events. The [[typeKey]] is used
+   * to create the [[ExternalShardAllocation]] client. When partitions are 
assigned to this consumer group member the
+   * rebalance listener will use the [[ExternalShardAllocation]] client to 
update the External Sharding strategy
    * accordingly so that entities are (eventually) routed to the local Akka 
cluster member.
+   *
+   * Returns an Akka classic [[ActorRef]] that can be passed to an Alpakka 
Kafka [[ConsumerSettings]].
    */
-  object RebalanceListener {
-    def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
-      Behaviors.setup { ctx =>
-        val typeKeyName = typeKey.name
-        val shardAllocationClient = 
ExternalShardAllocation(ctx.system).clientFor(typeKeyName)
-        val address = Cluster(ctx.system).selfMember.address
-        Behaviors.receive[ConsumerRebalanceEvent] {
-          case (ctx, TopicPartitionsAssigned(_, partitions)) =>
-            import ctx.executionContext
-            val partitionsList = partitions.mkString(",")
-            ctx.log.info("Consumer group '{}' is assigning topic partitions to 
cluster member '{}': [{}]",
-              typeKeyName, address, partitionsList)
-            val updates = partitions.map { tp =>
-              val shardId = tp.partition().toString
-              // Kafka partition number becomes the akka shard id
-              // TODO: support assigning more than 1 shard id at once?
-              shardAllocationClient.updateShardLocation(shardId, address)
+  @ApiMayChange
+  def rebalanceListener(system: ActorSystem, typeKey: EntityTypeKey[_]): 
ActorRef = {
+    val actor: Behavior[ConsumerRebalanceEvent] = Behaviors.setup { ctx =>
+      val typeKeyName = typeKey.name
+      val shardAllocationClient = 
ExternalShardAllocation(ctx.system).clientFor(typeKeyName)
+      val address = Cluster(ctx.system).selfMember.address
+      Behaviors.receive[ConsumerRebalanceEvent] {
+        case (ctx, TopicPartitionsAssigned(_, partitions)) =>
+          import ctx.executionContext
+          val partitionsList = partitions.mkString(",")
+          ctx.log.info("Consumer group '{}' is assigning topic partitions to 
cluster member '{}': [{}]",
+            typeKeyName, address, partitionsList)
+          val updates = partitions.map { tp =>
+            val shardId = tp.partition().toString
+            // Kafka partition number becomes the akka shard id
+            // TODO: support assigning more than 1 shard id at once?
+            shardAllocationClient.updateShardLocation(shardId, address)
+          }
+          Future
+            .sequence(updates)
+            // Each Future returns successfully once a majority of cluster 
nodes receive the update. There's no point
+            // blocking here because the rebalance listener is triggered 
asynchronously. If we want to block during
+            // rebalance then we should provide an implementation using the 
`PartitionAssignmentHandler` instead
+            .onComplete {
+              case Success(_) =>
+                ctx.log.info("Completed consumer group '{}' assignment of 
topic partitions to cluster member '{}': [{}]",
+                  typeKeyName, address, partitionsList)
+              case Failure(ex) =>
+                ctx.log.error("A failure occurred while updating cluster 
shards", ex)
             }
-            // TODO: pipeToSelf since we're closing over local state?
-            Future
-              .sequence(updates)
-              // each Future returns successfully once a majority of cluster 
nodes receive the update.
-              // there's no point blocking here because the rebalance listener 
is triggered asynchronously.  if we want
-              // to block rebalances then we should provide an implementing 
using the `PartitionAssignmentHandler` instead
-              .onComplete {
-                case Success(_) =>
-                  ctx.log.info("Completed consumer group '{}' assignment of 
topic partitions to cluster member '{}': [{}]",
-                    typeKeyName, address, partitionsList)
-                case Failure(ex) =>
-                  ctx.log.error("A failure occurred while updating cluster 
shards", ex)
-              }
-            Behaviors.same
-          case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same
-        }
+          Behaviors.same
+        case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same
       }
+    }
+
+    system
+      .toTyped
+      .systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener")
+      .toClassic
   }
 }
\ No newline at end of file
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 941f177..43ad4bb 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -10,10 +10,9 @@ import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
 import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-import akka.kafka.{ConsumerSettings, KafkaClusterSharding, Subscriptions}
-import akka.kafka.scaladsl.Consumer
-import akka.stream.scaladsl.Sink
-import akka.stream.scaladsl.Source
+import akka.kafka.{CommitterSettings, ConsumerMessage, ConsumerSettings, 
KafkaClusterSharding, Subscriptions}
+import akka.kafka.scaladsl.{Committer, Consumer}
+import akka.stream.scaladsl.SourceWithContext
 import akka.util.Timeout
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -41,10 +40,10 @@ object UserEventsKafkaProcessor {
         // TODO config
         val timeout = Timeout(3.seconds)
         val typeKey = 
EntityTypeKey[UserEvents.Message](processorSettings.groupId)
-        val rebalanceListener = 
ctx.spawn(KafkaClusterSharding.RebalanceListener(typeKey), 
"kafka-cluster-sharding-rebalance-listener")
+        val rebalanceListener = 
KafkaClusterSharding.rebalanceListener(classic, typeKey)
         val shardRegion = UserEvents.init(ctx.system, extractor, 
processorSettings.groupId)
         val consumerSettings =
-          ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new 
ByteArrayDeserializer)
+          ConsumerSettings(classic, new StringDeserializer, new 
ByteArrayDeserializer)
             .withBootstrapServers(processorSettings.bootstrapServers)
             .withGroupId(processorSettings.groupId)
             .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -52,12 +51,11 @@ object UserEventsKafkaProcessor {
 
         val subscription = Subscriptions
           .topics(processorSettings.topics: _*)
-          .withRebalanceListener(rebalanceListener.toClassic)
+          .withRebalanceListener(rebalanceListener)
 
-        val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], 
Consumer.Control] =
-          Consumer.plainSource(consumerSettings, subscription)
+        val kafkaConsumer: SourceWithContext[ConsumerRecord[String, 
Array[Byte]], ConsumerMessage.CommittableOffset, Consumer.Control] =
+          Consumer.sourceWithOffsetContext(consumerSettings, subscription)
 
-        // TODO use committable source and reliable delivery (once released)?
         val stream: Future[Done] = kafkaConsumer
           .log("kafka-consumer")
           .filter(_.key() != null) // no entity id
@@ -65,7 +63,7 @@ object UserEventsKafkaProcessor {
             // alternatively the user id could be in the message rather than 
use the kafka key
             ctx.log.info(s"entityId->partition 
${record.key()}->${record.partition()}")
             ctx.log.info("Forwarding message for entity {} to cluster 
sharding", record.key())
-            // idempotency?
+            // TODO idempotency? reliable delivery (once released)?
             retry(
               () =>
                 shardRegion.ask[Done](replyTo => {
@@ -77,10 +75,11 @@ object UserEventsKafkaProcessor {
                     purchaseProto.price,
                     replyTo)
                 })(timeout, ctx.system.scheduler),
-              3,
-              1.second)
+              attempts = 3,
+              delay = 1.second
+            )
           }
-          .runWith(Sink.ignore)
+          .runWith(Committer.sinkWithOffsetContext(CommitterSettings(classic)))
 
         stream.onComplete { result =>
           ctx.self ! KafkaConsumerStopped(result)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to