This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 34595d0d50bc3c0edf9dad25e9bd73153627c79e
Author: Sean Glover <[email protected]>
AuthorDate: Thu Feb 13 17:41:23 2020 -0500

    WIP
---
 akka-sample-kafka-to-sharding-scala/README.md      | 25 ++++++--
 akka-sample-kafka-to-sharding-scala/build.sbt      | 13 ++++-
 .../kafka/src/main/resources/logback.xml           | 23 ++++++++
 .../sharding/embeddedkafka/KafkaBroker.scala       | 22 +++++++
 .../processor/src/main/resources/application.conf  |  3 +-
 .../akka/kafka/KafkaShardingMessageExtractor.scala | 67 ++++++++++++++++++++++
 .../sample/sharding/kafka/ProcessorConfig.scala    |  8 +--
 .../sample/sharding/kafka/TopicListener.scala      | 18 +++---
 .../scala/sample/sharding/kafka/UserEvents.scala   | 33 ++++++-----
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |  6 +-
 .../producer/src/main/resources/application.conf   |  2 +-
 .../sharding/kafka/producer/ProducerConfig.scala   | 19 +-----
 .../kafka/producer/UserEventProducer.scala         | 19 +-----
 13 files changed, 188 insertions(+), 70 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md 
b/akka-sample-kafka-to-sharding-scala/README.md
index af1433c..7194fe2 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -30,17 +30,32 @@ The sample is made up of three applications:
 * `processor` An Akka Cluster Sharding application that reads the Kafka topic 
and forwards the messages to a sharded
               entity that represents a user and a gRPC front end for accessing 
the sharded actors state
 * `client` A gRPC client for interacting with the cluster
+* `kafka` A local Kafka server
               
 The sample demonstrates how the external shard allocation strategy can used so 
messages are processed locally.
 
 The sample depends on a Kafka broker running locally on port `9092` with a 
topic with 128 partitions called `user-events.`
-[Kafka can be run in Docker](https://github.com/wurstmeister/kafka-docker) or 
run locally following [these instructions](https://kafka.apache.org/quickstart).
+[Kafka can be run in Docker](https://github.com/wurstmeister/kafka-docker) or 
run locally using the optional `kafka` project.
 
-Update the `applications.conf`s in each project to point to your Kafka broker 
if not running on `localhost:9092`
+* Run the local Kafka server. This project will also create the `user-events` 
topic.
+
+```
+sbt "kafka / run"
+```
+
+In the Kafka server window you'll see the following when the server is ready:
+
+```
+12:46:47.022 INFO  [run-main-0          ] sample.sharding.embeddedkafka.Main$  
 Kafka running on port '9092'
+12:46:47.022 INFO  [run-main-0          ] sample.sharding.embeddedkafka.Main$  
 Topic 'user-events' with '128' partitions created
+```
+
+If you want to use a different Kafka cluster then then update the 
`applications.conf`s in each project to point to your 
+Kafka broker if not running on `localhost:9092`.
 
 
-* Create a topic with 128 partitions, or update application.conf with the 
desired number of
-  partitions e.g. a command from your Kafka installation:
+* _(Optional)_ If you do not run the local Kafka server then you must create a 
topic with 128 partitions, or update 
+  application.conf with the desired number of partitions e.g. a command from 
your Kafka installation:
   
 ```
   bin/kafka-topics.sh --create --bootstrap-server localhost:9092 
--replication-factor 1 --partitions 128 --topic user-events
@@ -104,7 +119,7 @@ As there is only one node we get 100% locality, each 
forwarded message is proces
 Now let's see that remain true once we add more nodes to the Akka Cluster, add 
another with different ports:
 
 ```
- sbt "processor / run 2552 8552 8082"
+sbt "processor / run 2552 8552 8082"
 ```
 
 When this starts up we'll see Kafka assign partitions to the new node (it is 
in the same consumer group):
diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt 
b/akka-sample-kafka-to-sharding-scala/build.sbt
index 10f8cbb..5a08bb7 100644
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ b/akka-sample-kafka-to-sharding-scala/build.sbt
@@ -1,8 +1,8 @@
 val AkkaVersion = "2.6.3"
-// TODO upgrade to 2.0.0
-val AlpakkaKafkaVersion = "1.1.0"
+val AlpakkaKafkaVersion = "2.0.1"
 val AkkaManagementVersion = "1.0.5"
 val AkkaHttpVersion = "10.1.11"
+val KafkaVersion = "2.4.0"
 val LogbackVersion = "1.2.3"
 
 ThisBuild / scalaVersion := "2.13.1"
@@ -22,6 +22,15 @@ Global / cancelable := true // ctrl-c
 
 lazy val `akka-sample-kafka-to-sharding` = 
project.in(file(".")).aggregate(producer, processor, client)
 
+lazy val kafka = project
+  .in(file("kafka"))
+  .settings(
+    libraryDependencies ++= Seq(
+      "ch.qos.logback" % "logback-classic" % LogbackVersion,
+      "org.slf4j" % "log4j-over-slf4j" % "1.7.26",
+      "io.github.embeddedkafka" %% "embedded-kafka" % KafkaVersion),
+    cancelable := false)
+
 lazy val client = project
   .in(file("client"))
   .enablePlugins(AkkaGrpcPlugin, JavaAgent)
diff --git 
a/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml 
b/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml
new file mode 100644
index 0000000..c747bc5
--- /dev/null
+++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml
@@ -0,0 +1,23 @@
+<configuration>
+    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+        <file>kafka/target/kafka.log</file>
+        <append>false</append>
+        <encoder>
+            <pattern>%d{ISO8601} %-5level [%-20.20thread] [%-36.36logger{36}]  
%msg%n%rEx</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} %-5level [%-20.20thread] 
%-36.36logger{36}  %msg%n%rEx</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="sample.sharding.embeddedkafka" levle="INFO">
+        <appender-ref ref="STDOUT"/>
+    </logger>
+
+    <root level="INFO">
+        <appender-ref ref="FILE" />
+    </root>
+</configuration>
diff --git 
a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
 
b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
new file mode 100644
index 0000000..90663a3
--- /dev/null
+++ 
b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
@@ -0,0 +1,22 @@
+package sample.sharding.embeddedkafka
+
+import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import org.slf4j.LoggerFactory
+
+object KafkaBroker extends App with EmbeddedKafka {
+  val log = LoggerFactory.getLogger(this.getClass)
+
+  val port = 9092
+  val topic = "user-events"
+  val partitions = 128
+
+  val config = EmbeddedKafkaConfig(kafkaPort = port)
+  val server = EmbeddedKafka.start()(config)
+
+  createCustomTopic(topic = topic, partitions = partitions)
+
+  log.info(s"Kafka running on port '$port'")
+  log.info(s"Topic '$topic' with '$partitions' partitions created")
+
+  server.broker.awaitShutdown()
+}
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
index 76f1266..ef7e721 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
@@ -1,8 +1,7 @@
 kafka-to-sharding-processor {
   bootstrap-servers = "localhost:9092"
-  topic = "user-events"
+  topics = ["user-events"]
   group = "group-1"
-  nr-partitions = 128
 }
 
 akka.http {
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
new file mode 100644
index 0000000..f02eb41
--- /dev/null
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
@@ -0,0 +1,67 @@
+package akka.kafka
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.kafka.scaladsl.MetadataClient
+import akka.util.Timeout._
+import org.apache.kafka.clients.producer.Partitioner
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner
+import org.apache.kafka.common.{Node, PartitionInfo, Cluster => KafkaCluster}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContextExecutor}
+import scala.jdk.CollectionConverters._
+
+private[kafka] trait DefaultKafkaShardingMessageExtractor {
+  implicit val actorSystem: ActorSystem
+  implicit val timeout: FiniteDuration
+  implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+
+  val clientSettings: ConsumerSettings[_, _]
+  val groupId: String
+  val topic: String
+
+  private val CLUSTER_ID = "cluster-id"
+  private val kafkaPartitioner = partitioner()
+  private val kafkaCluster = cluster(partitions())
+
+  def shardId(entityId: String): String = {
+    val partition = kafkaPartitioner
+      .partition(topic, entityId, entityId.getBytes(), null, null, 
kafkaCluster)
+    s"$groupId-$partition"
+  }
+
+  def partitions(): List[PartitionInfo] = {
+    val consumerActor = 
actorSystem.actorOf(KafkaConsumerActor.props(clientSettings), 
"metadata-consumer-actor")
+    val metadataClient = MetadataClient.create(consumerActor, timeout)
+    val partitions = metadataClient.getPartitionsFor(topic)
+    partitions.foreach(p => actorSystem.log.info("Retrieved %s partitions for 
topic %s for group %s", p.length, topic, groupId))
+    Await.result(partitions, timeout)
+  }
+
+  def cluster(partitions: List[PartitionInfo]): KafkaCluster =
+    new KafkaCluster(CLUSTER_ID, List.empty[Node].asJavaCollection, 
partitions.asJavaCollection, Set.empty[String].asJava, Set.empty[String].asJava)
+
+  def partitioner(): Partitioner = new DefaultPartitioner()
+}
+
+final class KafkaShardingMessageExtractor[M](val clientSettings: 
ConsumerSettings[_,_], val groupId: String, val topic: String)
+                                            (implicit val actorSystem: 
ActorSystem, val timeout: FiniteDuration)
+  extends ShardingMessageExtractor[ShardingEnvelope[M], M] with 
DefaultKafkaShardingMessageExtractor {
+  override def entityId(envelope: ShardingEnvelope[M]): String = 
envelope.entityId
+  override def unwrapMessage(envelope: ShardingEnvelope[M]): M = 
envelope.message
+}
+
+/**
+ * Caveats
+ * - If Consumer subscription contains multiple topics, each topic has the 
exact same number of partitions.
+ * - Values are passed as `null` to the partitioner.
+ * - A fake [[org.apache.kafka.common.Cluster]] is passed to the 
[[org.apache.kafka.clients.producer.Partitioner]] that
+ *   only contains partitions for the provided topic. If you choose to reuse a 
different partitioner then make sure your
+ *   partitioner doesn't make use of any other Kafka Cluster metadata.
+ */
+abstract class KafkaShardingNoEnvelopeExtractor[M](val clientSettings: 
ConsumerSettings[_,_], val groupId: String, val topic: String)
+                                                  (implicit val actorSystem: 
ActorSystem, val timeout: FiniteDuration)
+  extends ShardingMessageExtractor[M, M] with 
DefaultKafkaShardingMessageExtractor {
+  override def unwrapMessage(message: M): M = message
+}
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala
index fd07ac4..1749aee 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala
@@ -1,14 +1,14 @@
 package sample.sharding.kafka
 
 import com.typesafe.config.Config
+import scala.jdk.CollectionConverters._
 
 case object ProcessorConfig {
   def apply(config: Config): ProcessorConfig =
     new ProcessorConfig(
       config.getString("bootstrap-servers"),
-      config.getString("topic"),
-      config.getString("group"),
-      config.getInt("nr-partitions"))
+      config.getStringList("topics").asScala.toList,
+      config.getString("group"))
 }
 
-final class ProcessorConfig(val bootstrapServers: String, val topic: String, 
val groupId: String, val nrPartitions: Int)
+final class ProcessorConfig(val bootstrapServers: String, val topics: 
List[String], val groupId: String)
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
index ac62b4c..79be5e3 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
@@ -14,7 +14,7 @@ import scala.util.Failure
 import scala.util.Success
 
 object TopicListener {
-  def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
+  def apply(groupId: String, typeKey: EntityTypeKey[_]): 
Behavior[ConsumerRebalanceEvent] =
     Behaviors.setup { ctx =>
       import ctx.executionContext
       val shardAllocationClient = 
ExternalShardAllocation(ctx.system).clientFor(typeKey.name)
@@ -28,21 +28,23 @@ object TopicListener {
       }
       val address = Cluster(ctx.system).selfMember.address
       Behaviors.receiveMessage[ConsumerRebalanceEvent] {
-        case TopicPartitionsAssigned(_, partitions) =>
-          partitions.foreach(partition => {
-            ctx.log.info("Partition [{}] assigned to current node. Updating 
shard allocation", partition.partition())
+        case TopicPartitionsAssigned(sub, partitions) =>
+          partitions.foreach(tp => {
+            val shardId = s"$groupId-${tp.partition()}"
+            ctx.log.info("Partition [{}] assigned to current node. Updating 
shard allocation", shardId)
             // kafka partition becomes the akka shard
-            val done = 
shardAllocationClient.updateShardLocation(partition.partition().toString, 
address)
+            val done = shardAllocationClient.updateShardLocation(shardId, 
address)
             done.onComplete { result =>
-              ctx.log.info("Result for updating shard {}: {}", partition, 
result)
+              ctx.log.info("Result for updating shard {}: {}", shardId, result)
             }
 
           })
           Behaviors.same
         case TopicPartitionsRevoked(_, topicPartitions) =>
           ctx.log.info(
-            "Partitions [{}] revoked from current node. New location will 
update shard allocation",
-            topicPartitions.mkString(","))
+            "Partitions [{}] of group [{}] revoked from current node. New 
location will update shard allocation",
+            topicPartitions.mkString(","),
+            groupId)
           Behaviors.same
       }
     }
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index f093fff..f0f4f90 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -1,16 +1,16 @@
 package sample.sharding.kafka
 
 import akka.Done
-import akka.actor.typed.ActorRef
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Behavior
 import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.scaladsl.adapter._
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
 import akka.cluster.sharding.typed.ClusterShardingSettings
-import akka.cluster.sharding.typed.Murmur2NoEnvelopeMessageExtractor
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.cluster.sharding.typed.scaladsl.Entity
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, 
EntityTypeKey}
+import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor}
+import org.apache.kafka.common.serialization.StringDeserializer
+
+import scala.concurrent.duration._
 
 object UserEvents {
 
@@ -54,20 +54,27 @@ object UserEvents {
   }
 
   /*
-   * The murmur2 message extractor matches kafka's default partitioning when 
messages
-   * have keys that are strings
+   * The KafkaShardingMessageExtractor uses the KafkaProducer's underlying 
DefaultPartitioner so that the same murmur2
+   * hashing algorithm is used for Kafka and Akka Cluster Sharding
    */
-  class UserIdMessageExtractor(nrKafkaPartitions: Int)
-      extends Murmur2NoEnvelopeMessageExtractor[Message](nrKafkaPartitions) {
-    override def entityId(message: Message): String = message.userId
+  class UserIdMessageExtractor(clientSettings: ConsumerSettings[_,_], topic: 
String, groupId: String)
+                              (implicit actorSystem: akka.actor.ActorSystem, 
timeout: FiniteDuration)
+      extends KafkaShardingNoEnvelopeExtractor[Message](clientSettings, topic, 
groupId) {
+    def entityId(message: Message): String = message.userId
   }
 
   def init(system: ActorSystem[_]): ActorRef[Message] = {
     val processorConfig = 
ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
+    implicit val classic: akka.actor.ActorSystem = system.toClassic
+    implicit val timeout: FiniteDuration = 10.seconds
+    val clientSettings = ConsumerSettings(classic, new StringDeserializer, new 
StringDeserializer)
+    val topic = processorConfig.topics.head
+    val groupId = processorConfig.groupId
+    val messageExtractor = new UserIdMessageExtractor(clientSettings, topic, 
groupId)
     ClusterSharding(system).init(
       Entity(TypeKey)(createBehavior = _ => UserEvents())
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, 
TypeKey.name))
-        .withMessageExtractor(new 
UserIdMessageExtractor(processorConfig.nrPartitions))
+        .withMessageExtractor(messageExtractor)
         .withSettings(ClusterShardingSettings(system)))
   }
 
diff --git 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 8c69c83..511c3bc 100644
--- 
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -39,7 +39,7 @@ object UserEventsKafkaProcessor {
         implicit val scheduler: Scheduler = classic.scheduler
         // TODO config
         val timeout = Timeout(3.seconds)
-        val rebalancerRef = ctx.spawn(TopicListener(UserEvents.TypeKey), 
"rebalancerRef")
+        val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, 
UserEvents.TypeKey), "rebalancerRef")
         val shardRegion = UserEvents.init(ctx.system)
         val consumerSettings =
           ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new 
ByteArrayDeserializer)
@@ -48,7 +48,9 @@ object UserEventsKafkaProcessor {
             .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
             .withStopTimeout(0.seconds)
 
-        val subscription = 
Subscriptions.topics(processorSettings.topic).withRebalanceListener(rebalancerRef.toClassic)
+        val subscription = Subscriptions
+          .topics(processorSettings.topics: _*)
+          .withRebalanceListener(rebalancerRef.toClassic)
 
         val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], 
Consumer.Control] =
           Consumer.plainSource(consumerSettings, subscription)
diff --git 
a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
 
b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
index 2025bb6..38d51c0 100644
--- 
a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
+++ 
b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
@@ -1,5 +1,5 @@
 kafka-to-sharding-producer {
-  bootstrap-servers = "localhost:9092"
+  bootstrap-servers = "localhost:9094"
   topic = "user-events"
 
   # can be one of:
diff --git 
a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
 
b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
index f11b2b7..9d4c374 100644
--- 
a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
@@ -2,28 +2,13 @@ package sharding.kafka.producer
 
 import com.typesafe.config.Config
 
-sealed trait Partitioning
-case object Default extends Partitioning
-case object Explicit extends Partitioning
-
-object Partitioning {
-  def valueOf(input: String): Partitioning = input.toLowerCase match {
-    case "explicit" => Explicit
-    case _          => Default
-  }
-}
-
 case object ProducerConfig {
   def apply(config: Config): ProducerConfig =
     new ProducerConfig(
       config.getString("bootstrap-servers"),
-      config.getString("topic"),
-      Partitioning.valueOf(config.getString("partitioning")),
-      config.getInt("nr-partitions"))
+      config.getString("topic"))
 }
 
 final class ProducerConfig(
     val bootstrapServers: String,
-    val topic: String,
-    val partitioning: Partitioning,
-    val nrPartitions: Int)
+    val topic: String)
diff --git 
a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
 
b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
index c7d2869..846dee7 100644
--- 
a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
+++ 
b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
@@ -52,23 +52,10 @@ object UserEventProducer extends App {
         val product = products(Random.nextInt(products.size))
         val message = UserPurchaseProto(randomEntityId, product, quantity, 
price).toByteArray
         log.info("Sending message to user {}", randomEntityId)
-        producerRecord(randomEntityId, message)
-
-      })
-      .runWith(Producer.plainSink(producerSettings))
-
-  def producerRecord(entityId: String, message: Array[Byte]): 
ProducerRecord[String, Array[Byte]] = {
-    producerConfig.partitioning match {
-      case Default =>
         // rely on the default kafka partitioner to hash the key and 
distribute among shards
         // the logic of the default partitioner must be replicated in 
MessageExtractor entityId -> shardId function
-        new ProducerRecord[String, Array[Byte]](producerConfig.topic, 
entityId, message)
-      case Explicit =>
-        // this logic MUST be replicated in the MessageExtractor entityId -> 
shardId function!
-        val shardAndPartition = 
(Utils.toPositive(Utils.murmur2(entityId.getBytes(StandardCharsets.UTF_8))) % 
producerConfig.nrPartitions)
-        log.info(s"entityId->partition ${entityId}->${shardAndPartition}")
-        new ProducerRecord[String, Array[Byte]](producerConfig.topic, 
shardAndPartition, entityId, message)
-    }
-  }
+        new ProducerRecord[String, Array[Byte]](producerConfig.topic, 
randomEntityId, message)
 
+      })
+      .runWith(Producer.plainSink(producerSettings))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to