This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
The following commit(s) were added to refs/heads/main by this push:
new b75565b #243: Convert sharding-kafka to Pekko (#26)
b75565b is described below
commit b75565b208d592e3c7cae46b4996d8a85ba72a0e
Author: Sam Byng <[email protected]>
AuthorDate: Mon Apr 3 15:27:11 2023 +0100
#243: Convert sharding-kafka to Pekko (#26)
Co-authored-by: sb5 <[email protected]>
---
.github/workflows/build-test.yml | 4 +-
akka-sample-kafka-to-sharding-scala/build.sbt | 70 ---------------------
.../project/plugins.sbt | 5 --
docs-gen/build.sbt | 4 +-
.../README.md | 66 ++++++++++----------
pekko-sample-kafka-to-sharding-scala/build.sbt | 71 ++++++++++++++++++++++
.../client/src/main/protobuf/users.proto | 0
.../client/src/main/resources/application.conf | 2 +-
.../client/src/main/scala/client/ClientApp.scala | 6 +-
.../kafka/src/main/resources/logback.xml | 0
.../sharding/embeddedkafka/KafkaBroker.scala | 0
.../processor/src/main/protobuf/user-events.proto | 0
.../processor/src/main/protobuf/users.proto | 0
.../processor/src/main/resources/application.conf | 12 ++--
.../processor/src/main/resources/logback.xml | 4 +-
.../sample/sharding/kafka/CborSerializable.scala | 0
.../main/scala/sample/sharding/kafka/Main.scala | 28 ++++-----
.../sample/sharding/kafka/ProcessorSettings.scala | 8 +--
.../scala/sample/sharding/kafka/UserEvents.scala | 12 ++--
.../sharding/kafka/UserEventsKafkaProcessor.scala | 28 ++++-----
.../sample/sharding/kafka/UserGrpcService.scala | 6 +-
.../producer/src/main/protobuf/user-events.proto | 0
.../producer/src/main/resources/application.conf | 0
.../producer/src/main/resources/logback.xml | 2 +-
.../sharding/kafka/producer/ProducerConfig.scala | 0
.../kafka/producer/UserEventProducer.scala | 18 +++---
.../project/build.properties | 0
.../project/plugins.sbt | 7 +++
28 files changed, 178 insertions(+), 175 deletions(-)
diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 45e4370..2788f6a 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -68,8 +68,8 @@ jobs:
- name: Test akka-sample-sharding-scala
run: cd akka-sample-sharding-scala && sbt test
- - name: Test akka-sample-kafka-to-sharding-scala
- run: cd akka-sample-kafka-to-sharding-scala && sbt test
+ - name: Test pekko-sample-kafka-to-sharding-scala
+ run: cd pekko-sample-kafka-to-sharding-scala && sbt test
- name: Test docs gen
run: cd docs-gen && sbt paradox
diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt
b/akka-sample-kafka-to-sharding-scala/build.sbt
deleted file mode 100644
index c720c1c..0000000
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ /dev/null
@@ -1,70 +0,0 @@
-val AkkaVersion = "2.6.20"
-val AlpakkaKafkaVersion = "3.0.1"
-val AkkaManagementVersion = "1.1.4"
-val AkkaHttpVersion = "10.2.10"
-val EmbeddedKafkaVersion = "2.4.1.1"
-val LogbackVersion = "1.2.11"
-
-ThisBuild / scalaVersion := "2.13.8"
-ThisBuild / organization := "com.lightbend.akka.samples"
-ThisBuild / Compile / scalacOptions ++= Seq(
- "-deprecation",
- "-feature",
- "-unchecked",
- "-Xlog-reflective-calls",
- "-Xlint")
-ThisBuild / Compile / javacOptions ++= Seq("-Xlint:unchecked",
"-Xlint:deprecation")
-ThisBuild / Test / testOptions += Tests.Argument("-oDF")
-ThisBuild / licenses := Seq(("CC0",
url("http://creativecommons.org/publicdomain/zero/1.0")))
-ThisBuild / resolvers ++= Seq(
- "Akka Snapshots" at "https://repo.akka.io/snapshots",
- Resolver.bintrayRepo("akka", "snapshots"))
-
-Global / cancelable := true // ctrl-c
-
-lazy val `akka-sample-kafka-to-sharding` =
project.in(file(".")).aggregate(producer, processor, client)
-
-lazy val kafka = project
- .in(file("kafka"))
- .settings(
- libraryDependencies ++= Seq(
- "ch.qos.logback" % "logback-classic" % LogbackVersion,
- "org.slf4j" % "log4j-over-slf4j" % "1.7.26",
- "io.github.embeddedkafka" %% "embedded-kafka" % EmbeddedKafkaVersion),
- cancelable := false)
-
-lazy val client = project
- .in(file("client"))
- .enablePlugins(AkkaGrpcPlugin, JavaAgent)
- .settings(
- libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
- "com.typesafe.akka" %% "akka-discovery" % AkkaVersion))
-
-lazy val processor = project
- .in(file("processor"))
- .enablePlugins(AkkaGrpcPlugin, JavaAgent)
- .settings(javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" %
"2.0.9" % "runtime;test")
- .settings(libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
- "com.typesafe.akka" %% "akka-stream-kafka-cluster-sharding" %
AlpakkaKafkaVersion,
- "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
- "com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
- "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
- "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion,
- "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
- "com.lightbend.akka.management" %% "akka-management" %
AkkaManagementVersion,
- "com.lightbend.akka.management" %% "akka-management-cluster-http" %
AkkaManagementVersion,
- "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
- "ch.qos.logback" % "logback-classic" % LogbackVersion,
- "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
- "org.scalatest" %% "scalatest" % "3.0.8" % Test))
-
-lazy val producer = project
- .in(file("producer"))
- .settings(Compile / PB.targets := Seq(scalapb.gen() -> (Compile /
sourceManaged).value))
- .settings(libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
- "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
- "ch.qos.logback" % "logback-classic" % "1.2.11",
- "org.scalatest" %% "scalatest" % "3.0.8" % Test))
diff --git a/akka-sample-kafka-to-sharding-scala/project/plugins.sbt
b/akka-sample-kafka-to-sharding-scala/project/plugins.sbt
deleted file mode 100644
index f23b824..0000000
--- a/akka-sample-kafka-to-sharding-scala/project/plugins.sbt
+++ /dev/null
@@ -1,5 +0,0 @@
-addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.7.3")
-addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4") // ALPN agent
-addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.25")
-
-libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.9.0"
diff --git a/docs-gen/build.sbt b/docs-gen/build.sbt
index 74ce6dd..9eebd37 100644
--- a/docs-gen/build.sbt
+++ b/docs-gen/build.sbt
@@ -64,8 +64,8 @@ lazy val `akka-sample-sharding-scala` = project
name := "Akka Cluster Sharding with Scala",
baseProject := "akka-sample-sharding-scala")
-lazy val `akka-sample-kafka-to-sharding-scala` = project
+lazy val `pekko-sample-kafka-to-sharding-scala` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
name := "Akka Kafka to Sharding with Scala",
- baseProject := "akka-sample-kafka-to-sharding-scala")
+ baseProject := "pekko-sample-kafka-to-sharding-scala")
diff --git a/akka-sample-kafka-to-sharding-scala/README.md
b/pekko-sample-kafka-to-sharding-scala/README.md
similarity index 60%
rename from akka-sample-kafka-to-sharding-scala/README.md
rename to pekko-sample-kafka-to-sharding-scala/README.md
index 5b4f9da..eddf2e3 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/pekko-sample-kafka-to-sharding-scala/README.md
@@ -1,4 +1,4 @@
-# Aligning Kafka Partitions with Akka Cluster Sharding
+# Aligning Kafka Partitions with Apache Pekko Cluster Sharding
It is common to consume a Kafka topic and forward the messages to sharded
actors.
@@ -19,18 +19,18 @@ Imagine a scenario that processes all events for users with
following constraint
* The key of the kafka message is the user id which is in turn the entity id
in sharding
* All messages for the same user id end up in the same partition
-Then we can enforce that the kafka partition == the akka cluster shard id and
use the external
+Then we can enforce that the kafka partition == the Apache Pekko cluster shard
id and use the external
sharding allocation strategy to move shards to the node that is consuming that
partition, resulting
in no cross node traffic.
-Read the following documentation to learn more about [Akka Cluster External
Shard
Allocation](https://pekko.apache.org/docs/pekko/current//typed/cluster-sharding.html#external-shard-allocation)
-and its support for Kafka in [Alpakka Kafka Cluster
Sharding](https://doc.akka.io/docs/alpakka-kafka/current/cluster-sharding.html).
+Read the following documentation to learn more about [Apache Pekko Cluster
External Shard
Allocation](https://pekko.apache.org/docs/pekko/current//typed/cluster-sharding.html#external-shard-allocation)
+and its support for Kafka in [Apache Pekko Connectors Kafka Cluster
Sharding](https://pekko.apache.org/docs/pekko-connectors-kafka/current/cluster-sharding.html)
.
# Running the sample
The sample is made up of three applications:
* `producer` A Kafka producer, that produces events about users
-* `processor` An Akka Cluster Sharding application that reads the Kafka topic
and forwards the messages to a sharded
+* `processor` An Apache Pekko Cluster Sharding application that reads the
Kafka topic and forwards the messages to a sharded
entity that represents a user and a gRPC front end for accessing
the sharded actors state
* `client` A gRPC client for interacting with the cluster
* `kafka` A local Kafka server
@@ -65,21 +65,21 @@ Kafka broker if not running on `localhost:9092`.
```
* Start a single processor, this will consume the messages from the topic and
distribute them to sharding,
- three arguments are required, the akka remoting port, the akka management
port, and the gRPC port for the front end.
-* If you run on different ports the first two akka remoting ports should be
2551/2552 as they are configured as seeds.
+ three arguments are required, the pekko remoting port, the pekko management
port, and the gRPC port for the front end.
+* If you run on different ports the first two pekko remoting ports should be
7345/7355 as they are configured as seeds.
* As there is a single consumer, all partitions will initially be assigned to
this node.
```
-sbt "processor / run 2551 8551 8081"
+sbt "processor / run 7345 8551 8081"
```
The processor starts a KafkaConsumer, as it is the only consumer in the group
it will be assigned every single Kafka partition
and shards for each partition will be assigned to the current node. You will
see logs like:
```
-[info] [2020-01-16 09:48:51,040] [INFO]
[akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition
[1] assigned to current node. Updating shard allocation
-[info] [2020-01-16 09:48:51,040] [INFO]
[akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition
[25] assigned to current node. Updating shard allocation
-[info] [2020-01-16 09:48:51,043] [INFO]
[akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition
[116] assigned to current node. Updating shard allocation
+[info] [2020-01-16 09:48:51,040] [INFO]
[pekko://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition
[1] assigned to current node. Updating shard allocation
+[info] [2020-01-16 09:48:51,040] [INFO]
[pekko://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition
[25] assigned to current node. Updating shard allocation
+[info] [2020-01-16 09:48:51,043] [INFO]
[pekko://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition
[116] assigned to current node. Updating shard allocation
```
If there are existing messages on the topic they will all be processed locally
as there is a single node.
@@ -93,23 +93,23 @@ sbt "producer / run"
In the producer window you'll see:
```
-[INFO] [01/16/2020 09:51:38.639] [UserEventProducer(akka://UserEventProducer)]
Sending message to user 29
-[INFO] [01/16/2020 09:51:39.660] [UserEventProducer(akka://UserEventProducer)]
Sending message to user 60
-[INFO] [01/16/2020 09:51:40.680] [UserEventProducer(akka://UserEventProducer)]
Sending message to user 75
+[INFO] [01/16/2020 09:51:38.639]
[UserEventProducer(pekko://UserEventProducer)] Sending message to user 29
+[INFO] [01/16/2020 09:51:39.660]
[UserEventProducer(pekko://UserEventProducer)] Sending message to user 60
+[INFO] [01/16/2020 09:51:40.680]
[UserEventProducer(pekko://UserEventProducer)] Sending message to user 75
```
In the single processor node the messages will start flowing:
```
-[info] [2020-01-16 09:51:38,672] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-akka.actor.default-dispatcher-26]
[akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 29->45
-[info] [2020-01-16 09:51:38,672] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-akka.actor.default-dispatcher-26]
[akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 29 to cluster sharding
-[info] [2020-01-16 09:51:38,673] [INFO] [sample.sharding.kafka.UserEvents$]
[KafkaToSharding-akka.actor.default-dispatcher-26]
[akka://KafkaToSharding/system/sharding/user-processing/75/29] - user 29
purchase cat t-shirt, quantity 0, price 8874
-[info] [2020-01-16 09:51:39,702] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-akka.actor.default-dispatcher-17]
[akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition
60->111
-[info] [2020-01-16 09:51:39,703] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-akka.actor.default-dispatcher-17]
[akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 60 to cluster sharding
-[info] [2020-01-16 09:51:39,706] [INFO] [sample.sharding.kafka.UserEvents$]
[KafkaToSharding-akka.actor.default-dispatcher-17]
[akka://KafkaToSharding/system/sharding/user-processing/2/60] - user 60
purchase cat t-shirt, quantity 2, price 9375
-[info] [2020-01-16 09:51:40,732] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-akka.actor.default-dispatcher-17]
[akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 75->1
-[info] [2020-01-16 09:51:40,732] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-akka.actor.default-dispatcher-17]
[akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 75 to cluster sharding
+[info] [2020-01-16 09:51:38,672] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-pekko.actor.default-dispatcher-26]
[pekko://KafkaToSharding/user/kafka-event-processor] - entityId->partition
29->45
+[info] [2020-01-16 09:51:38,672] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-pekko.actor.default-dispatcher-26]
[pekko://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 29 to cluster sharding
+[info] [2020-01-16 09:51:38,673] [INFO] [sample.sharding.kafka.UserEvents$]
[KafkaToSharding-pekko.actor.default-dispatcher-26]
[pekko://KafkaToSharding/system/sharding/user-processing/75/29] - user 29
purchase cat t-shirt, quantity 0, price 8874
+[info] [2020-01-16 09:51:39,702] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-pekko.actor.default-dispatcher-17]
[pekko://KafkaToSharding/user/kafka-event-processor] - entityId->partition
60->111
+[info] [2020-01-16 09:51:39,703] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-pekko.actor.default-dispatcher-17]
[pekko://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 60 to cluster sharding
+[info] [2020-01-16 09:51:39,706] [INFO] [sample.sharding.kafka.UserEvents$]
[KafkaToSharding-pekko.actor.default-dispatcher-17]
[pekko://KafkaToSharding/system/sharding/user-processing/2/60] - user 60
purchase cat t-shirt, quantity 2, price 9375
+[info] [2020-01-16 09:51:40,732] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-pekko.actor.default-dispatcher-17]
[pekko://KafkaToSharding/user/kafka-event-processor] - entityId->partition 75->1
+[info] [2020-01-16 09:51:40,732] [INFO]
[sample.sharding.kafka.UserEventsKafkaProcessor$]
[KafkaToSharding-pekko.actor.default-dispatcher-17]
[pekko://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 75 to cluster sharding
```
The first log line is just after the message has been taken from Kafka.
@@ -119,10 +119,10 @@ Kafka partition is being consumed.
As there is only one node we get 100% locality, each forwarded message is
processed on the same node
-Now let's see that remain true once we add more nodes to the Akka Cluster, add
another with different ports:
+Now let's see that remain true once we add more nodes to the Apache Pekko
Cluster, add another with different ports:
```
-sbt "processor / run 2552 8552 8082"
+sbt "processor / run 7355 8552 8082"
```
When this starts up we'll see Kafka assign partitions to the new node (it is
in the same consumer group):
@@ -134,7 +134,7 @@ Partition [29] assigned to current node. Updating shard
allocation
On one of the nodes, where the ShardCoordinator runs, we'll see the rebalance
happening:
```
-[info] [2020-01-16 09:59:39,923] [INFO]
[akka://[email protected]:2551/system/sharding/user-processingCoordinator/singleton/coordinator]
- Starting rebalance for shards [45,33,16,2,3,15,11,6,36]. Current shards
rebalancing: []
+[info] [2020-01-16 09:59:39,923] [INFO]
[pekko://[email protected]:7345/system/sharding/user-processingCoordinator/singleton/coordinator]
- Starting rebalance for shards [45,33,16,2,3,15,11,6,36]. Current shards
rebalancing: []
```
Both nodes now have roughly 64 shards / partitions, all co-located with the
Kafka Consuemer.
@@ -142,17 +142,17 @@ You can verify this by the logs showing that when a
message is received by the K
cluster sharding the entity logs receiving the event on the same node.
```
-[info] [2020-01-17 08:27:58,199] [INFO]
[akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 29 to cluster sharding
-[info] [2020-01-17 08:27:58,204] [INFO]
[akka://KafkaToSharding/system/sharding/user-processing/45/29] - user 29
purchase cat t-shirt, quantity 1, price 2093
-[info] [2020-01-17 08:28:08,218] [INFO]
[akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 56 to cluster sharding
-[info] [2020-01-17 08:28:08,218] [INFO]
[akka://KafkaToSharding/system/sharding/user-processing/6/56] - user 56
purchase akka t-shirt, quantity 3, price 8576
-[info] [2020-01-17 08:28:28,288] [INFO]
[akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 44 to cluster sharding
-[info] [2020-01-17 08:28:28,296] [INFO]
[akka://KafkaToSharding/system/sharding/user-processing/59/44] - user 44
purchase cat t-shirt, quantity 3, price 9716
+[info] [2020-01-17 08:27:58,199] [INFO]
[pekko://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 29 to cluster sharding
+[info] [2020-01-17 08:27:58,204] [INFO]
[pekko://KafkaToSharding/system/sharding/user-processing/45/29] - user 29
purchase cat t-shirt, quantity 1, price 2093
+[info] [2020-01-17 08:28:08,218] [INFO]
[pekko://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 56 to cluster sharding
+[info] [2020-01-17 08:28:08,218] [INFO]
[pekko://KafkaToSharding/system/sharding/user-processing/6/56] - user 56
purchase pekko t-shirt, quantity 3, price 8576
+[info] [2020-01-17 08:28:28,288] [INFO]
[pekko://KafkaToSharding/user/kafka-event-processor] - Forwarding message for
entity 44 to cluster sharding
+[info] [2020-01-17 08:28:28,296] [INFO]
[pekko://KafkaToSharding/system/sharding/user-processing/59/44] - user 44
purchase cat t-shirt, quantity 3, price 9716
```
Each forwarding messaging is followed by log for the same entity on the
current node.
-Using Akka management we can see the shard allocations and the number of
entities per shard (uses `curl` and `jq`):
+Using Apache Pekko Management we can see the shard allocations and the number
of entities per shard (uses `curl` and `jq`):
```
# Node 1:
@@ -173,7 +173,7 @@ curl -s localhost:8552/cluster/shards/user-processing | jq
-r "." | grep shardId
The number of shards will depend on which entities have received messages.
-We now have a 2 node Akka Cluster with a Kafka Consumer running on each where
the kafka partitions align
+We now have a 2 node Apache Pekko Cluster with a Kafka Consumer running on
each where the kafka partitions align
with Cluster shards.
A use case for sending the processing to sharding is it allows each entity to
be queried from any where in the cluster
diff --git a/pekko-sample-kafka-to-sharding-scala/build.sbt
b/pekko-sample-kafka-to-sharding-scala/build.sbt
new file mode 100644
index 0000000..8b0d15a
--- /dev/null
+++ b/pekko-sample-kafka-to-sharding-scala/build.sbt
@@ -0,0 +1,71 @@
+val pekkoVersion = "0.0.0+26623-85c2a469-SNAPSHOT"
+val pekkoHttpVersion = "0.0.0+4335-81a9800e-SNAPSHOT"
+
+val pekkoConnectorsKafkaVersion = "0.0.0+1717-267012de-SNAPSHOT"
+val pekkoManagementVersion = "0.0.0+710-b49055bd-SNAPSHOT"
+val EmbeddedKafkaVersion = "2.4.1.1"
+val LogbackVersion = "1.2.11"
+
+ThisBuild / scalaVersion := "2.13.8"
+ThisBuild / organization := "org.apache.pekko"
+ThisBuild / Compile / scalacOptions ++= Seq(
+ "-deprecation",
+ "-feature",
+ "-unchecked",
+ "-Xlog-reflective-calls",
+ "-Xlint")
+ThisBuild / Compile / javacOptions ++= Seq("-Xlint:unchecked",
"-Xlint:deprecation")
+ThisBuild / Test / testOptions += Tests.Argument("-oDF")
+ThisBuild / licenses := Seq(("CC0",
url("http://creativecommons.org/publicdomain/zero/1.0")))
+
+// allow access to snapshots
+ThisBuild / resolvers += "Apache Nexus
Snapshots".at("https://repository.apache.org/content/groups/snapshots/")
+
+Global / cancelable := true // ctrl-c
+
+lazy val `pekko-sample-kafka-to-sharding` =
project.in(file(".")).aggregate(producer, processor, client)
+
+lazy val kafka = project
+ .in(file("kafka"))
+ .settings(
+ libraryDependencies ++= Seq(
+ "ch.qos.logback" % "logback-classic" % LogbackVersion,
+ "org.slf4j" % "log4j-over-slf4j" % "1.7.26",
+ "io.github.embeddedkafka" %% "embedded-kafka" % EmbeddedKafkaVersion),
+ cancelable := false)
+
+lazy val client = project
+ .in(file("client"))
+ .enablePlugins(PekkoGrpcPlugin, JavaAgent)
+ .settings(
+ libraryDependencies ++= Seq(
+ "org.apache.pekko" %% "pekko-stream" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-discovery" % pekkoVersion))
+
+lazy val processor = project
+ .in(file("processor"))
+ .enablePlugins(PekkoGrpcPlugin, JavaAgent)
+ .settings(javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" %
"2.0.9" % "runtime;test")
+ .settings(libraryDependencies ++= Seq(
+ "org.apache.pekko" %% "pekko-connectors-kafka" %
pekkoConnectorsKafkaVersion,
+ "org.apache.pekko" %% "pekko-connectors-kafka-cluster-sharding" %
pekkoConnectorsKafkaVersion,
+ "org.apache.pekko" %% "pekko-stream" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-discovery" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-cluster-sharding-typed" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-stream-typed" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-management" % pekkoManagementVersion,
+ "org.apache.pekko" %% "pekko-management-cluster-http" %
pekkoManagementVersion,
+ "org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion,
+ "ch.qos.logback" % "logback-classic" % LogbackVersion,
+ "org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test,
+ "org.scalatest" %% "scalatest" % "3.0.8" % Test))
+
+lazy val producer = project
+ .in(file("producer"))
+ .settings(Compile / PB.targets := Seq(scalapb.gen() -> (Compile /
sourceManaged).value))
+ .settings(libraryDependencies ++= Seq(
+ "org.apache.pekko" %% "pekko-connectors-kafka" %
pekkoConnectorsKafkaVersion,
+ "org.apache.pekko" %% "pekko-stream" % pekkoVersion,
+ "ch.qos.logback" % "logback-classic" % "1.2.11",
+ "org.scalatest" %% "scalatest" % "3.0.8" % Test))
diff --git
a/akka-sample-kafka-to-sharding-scala/client/src/main/protobuf/users.proto
b/pekko-sample-kafka-to-sharding-scala/client/src/main/protobuf/users.proto
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/client/src/main/protobuf/users.proto
rename to
pekko-sample-kafka-to-sharding-scala/client/src/main/protobuf/users.proto
diff --git
a/akka-sample-kafka-to-sharding-scala/client/src/main/resources/application.conf
b/pekko-sample-kafka-to-sharding-scala/client/src/main/resources/application.conf
similarity index 52%
rename from
akka-sample-kafka-to-sharding-scala/client/src/main/resources/application.conf
rename to
pekko-sample-kafka-to-sharding-scala/client/src/main/resources/application.conf
index e73c73c..f374f57 100644
---
a/akka-sample-kafka-to-sharding-scala/client/src/main/resources/application.conf
+++
b/pekko-sample-kafka-to-sharding-scala/client/src/main/resources/application.conf
@@ -1,4 +1,4 @@
-akka.grpc.client {
+pekko.grpc.client {
"UserService" {
}
diff --git
a/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
b/pekko-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
similarity index 88%
rename from
akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
rename to
pekko-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
index 8254f0b..8049506 100644
---
a/akka-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
+++
b/pekko-sample-kafka-to-sharding-scala/client/src/main/scala/client/ClientApp.scala
@@ -1,8 +1,8 @@
package client
-import akka.actor.ActorSystem
-import akka.grpc.GrpcClientSettings
-import akka.stream.Materializer
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.grpc.GrpcClientSettings
+import org.apache.pekko.stream.Materializer
import sample.sharding.kafka.UserServiceClient
import sample.sharding.kafka.UserStatsRequest
diff --git
a/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml
b/pekko-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml
rename to
pekko-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml
diff --git
a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
b/pekko-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
rename to
pekko-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
similarity index 73%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
index 9b31192..bcbdfa5 100644
---
a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
+++
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
@@ -5,11 +5,11 @@ kafka-to-sharding-processor {
ask-timeout = 10s
}
-akka.http {
+pekko.http {
server.preview.enable-http2 = on
}
-akka {
+pekko {
actor {
provider = "cluster"
@@ -26,10 +26,10 @@ akka {
cluster {
seed-nodes = [
- "akka://[email protected]:2551"
- "akka://[email protected]:2552"
+ "pekko://[email protected]:7345"
+ "pekko://[email protected]:7355"
]
- downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
+ downing-provider-class = "pekko.cluster.sbr.SplitBrainResolverProvider"
sharding {
retry-interval = 200ms
waiting-for-state-timeout = 200ms
@@ -37,7 +37,7 @@ akka {
}
}
-akka.management {
+pekko.management {
http {
hostname = "127.0.0.1"
port = 8558
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
similarity index 74%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
index 31d2e1d..0851949 100644
---
a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
+++
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
@@ -3,13 +3,13 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
- <pattern>[%date{ISO8601}] [%level] [%logger] [%thread]
[%X{akkaSource}] - %msg%n</pattern>
+ <pattern>[%date{ISO8601}] [%level] [%logger] [%thread]
[%X{pekkoSource}] - %msg%n</pattern>
</encoder>
</appender>
<logger name="org.apache.kafka" level="WARN" />
- <logger name="akka.cluster.sharding" level="DEBUG" />
+ <logger name="org.apache.pekko.cluster.sharding" level="DEBUG" />
<root level="INFO">
<appender-ref ref="STDOUT"/>
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/CborSerializable.scala
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/CborSerializable.scala
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/CborSerializable.scala
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/CborSerializable.scala
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
similarity index 83%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 2f5c2f3..d5d97a7 100644
---
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -1,13 +1,13 @@
package sample.sharding.kafka
-import akka.actor.typed.scaladsl.adapter._
-import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
-import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
-import akka.cluster.typed.{ Cluster, SelfUp, Subscribe }
-import akka.http.scaladsl._
-import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
-import akka.management.scaladsl.AkkaManagement
-import akka.stream.Materializer
+import org.apache.pekko.actor.typed.scaladsl.adapter._
+import org.apache.pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import org.apache.pekko.actor.typed.{ ActorRef, ActorSystem, Behavior,
Terminated }
+import org.apache.pekko.cluster.typed.{ Cluster, SelfUp, Subscribe }
+import org.apache.pekko.http.scaladsl._
+import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
+import org.apache.pekko.management.scaladsl.PekkoManagement
+import org.apache.pekko.stream.Materializer
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
@@ -35,10 +35,10 @@ object Main {
}
}
- def init(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int):
Unit = {
+ def init(remotingPort: Int, pekkoManagementPort: Int, frontEndPort: Int):
Unit = {
ActorSystem(Behaviors.setup[Command] {
ctx =>
- AkkaManagement(ctx.system.toClassic).start()
+ PekkoManagement(ctx.system.toClassic).start()
val cluster = Cluster(ctx.system)
val upAdapter = ctx.messageAdapter[SelfUp](_ => NodeMemberUp)
cluster.subscriptions ! Subscribe(upAdapter, classOf[SelfUp])
@@ -48,7 +48,7 @@ object Main {
case Failure(ex) => throw ex
}
starting(ctx, None, joinedCluster = false, settings)
- }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
+ }, "KafkaToSharding", config(remotingPort, pekkoManagementPort))
def start(ctx: ActorContext[Command], region:
ActorRef[UserEvents.Command], settings: ProcessorSettings)
: Behavior[Command] = {
@@ -98,7 +98,7 @@ object Main {
system: ActorSystem[_], frontEndPort: Int, region:
ActorRef[UserEvents.Command]): Future[Http.ServerBinding] = {
val mat = Materializer.createMaterializer(system.toClassic)
val service: HttpRequest => Future[HttpResponse] =
- UserServiceHandler(new UserGrpcService(system, region))(mat,
system.toClassic)
+ UserServiceHandler(new UserGrpcService(system,
region))(system.toClassic)
Http()(system.toClassic).bindAndHandleAsync(
service,
interface = "127.0.0.1",
@@ -109,8 +109,8 @@ object Main {
def config(port: Int, managementPort: Int): Config =
ConfigFactory.parseString(s"""
- akka.remote.artery.canonical.port = $port
- akka.management.http.port = $managementPort
+ pekko.remote.artery.canonical.port = $port
+ pekko.management.http.port = $managementPort
""").withFallback(ConfigFactory.load())
}
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
similarity index 88%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
index e5796b1..078de5d 100644
---
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
+++
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
@@ -1,9 +1,9 @@
package sample.sharding.kafka
-import akka.actor.ActorSystem
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-import akka.kafka.ConsumerSettings
-import akka.util.Timeout
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.cluster.sharding.typed.scaladsl.EntityTypeKey
+import org.apache.pekko.kafka.ConsumerSettings
+import org.apache.pekko.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer,
StringDeserializer }
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
similarity index 84%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index db29080..f26c1b0 100644
---
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -1,11 +1,11 @@
package sample.sharding.kafka
-import akka.Done
-import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
-import akka.cluster.sharding.external.ExternalShardAllocationStrategy
-import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity }
-import akka.kafka.cluster.sharding.KafkaClusterSharding
+import org.apache.pekko.Done
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.actor.typed.{ ActorRef, ActorSystem, Behavior }
+import
org.apache.pekko.cluster.sharding.external.ExternalShardAllocationStrategy
+import org.apache.pekko.cluster.sharding.typed.scaladsl.{ ClusterSharding,
Entity }
+import org.apache.pekko.kafka.cluster.sharding.KafkaClusterSharding
import scala.concurrent.Future
import scala.concurrent.duration._
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
similarity index 78%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 4fe5f48..71adfa3 100644
---
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -5,20 +5,20 @@ import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
-import akka.Done
-import akka.actor.Scheduler
-import akka.actor.typed.scaladsl.AskPattern._
-import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.scaladsl.adapter._
-import akka.actor.typed.ActorRef
-import akka.actor.typed.Behavior
-import akka.actor.typed.{ ActorSystem => TypedActorSystem }
-import akka.kafka.cluster.sharding.KafkaClusterSharding
-import akka.kafka.scaladsl.Committer
-import akka.kafka.scaladsl.Consumer
-import akka.kafka.CommitterSettings
-import akka.kafka.Subscriptions
-import akka.pattern.retry
+import org.apache.pekko.Done
+import org.apache.pekko.actor.Scheduler
+import org.apache.pekko.actor.typed.scaladsl.AskPattern._
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.actor.typed.scaladsl.adapter._
+import org.apache.pekko.actor.typed.ActorRef
+import org.apache.pekko.actor.typed.Behavior
+import org.apache.pekko.actor.typed.{ ActorSystem => TypedActorSystem }
+import org.apache.pekko.kafka.cluster.sharding.KafkaClusterSharding
+import org.apache.pekko.kafka.scaladsl.Committer
+import org.apache.pekko.kafka.scaladsl.Consumer
+import org.apache.pekko.kafka.CommitterSettings
+import org.apache.pekko.kafka.Subscriptions
+import org.apache.pekko.pattern.retry
import org.slf4j.LoggerFactory
import sample.sharding.kafka.serialization.UserPurchaseProto
diff --git
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
similarity index 81%
rename from
akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
rename to
pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index cf96a47..c65556b 100644
---
a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++
b/pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -1,8 +1,8 @@
package sample.sharding.kafka
-import akka.actor.typed.scaladsl.AskPattern._
-import akka.actor.typed.{ ActorRef, ActorSystem, Scheduler }
-import akka.util.Timeout
+import org.apache.pekko.actor.typed.scaladsl.AskPattern._
+import org.apache.pekko.actor.typed.{ ActorRef, ActorSystem, Scheduler }
+import org.apache.pekko.util.Timeout
import sample.sharding.kafka.UserEvents.{ Command, GetRunningTotal,
RunningTotal }
import scala.concurrent.duration._
diff --git
a/akka-sample-kafka-to-sharding-scala/producer/src/main/protobuf/user-events.proto
b/pekko-sample-kafka-to-sharding-scala/producer/src/main/protobuf/user-events.proto
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/producer/src/main/protobuf/user-events.proto
rename to
pekko-sample-kafka-to-sharding-scala/producer/src/main/protobuf/user-events.proto
diff --git
a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
b/pekko-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
rename to
pekko-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
diff --git
a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/logback.xml
b/pekko-sample-kafka-to-sharding-scala/producer/src/main/resources/logback.xml
similarity index 90%
rename from
akka-sample-kafka-to-sharding-scala/producer/src/main/resources/logback.xml
rename to
pekko-sample-kafka-to-sharding-scala/producer/src/main/resources/logback.xml
index d631ec1..9dc5a05 100644
---
a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/logback.xml
+++
b/pekko-sample-kafka-to-sharding-scala/producer/src/main/resources/logback.xml
@@ -6,7 +6,7 @@
<level>INFO</level>
</filter>
<encoder>
- <pattern>[%date{ISO8601}] [%level] [%logger] [%thread]
[%X{akkaSource}] - %msg%n</pattern>
+ <pattern>[%date{ISO8601}] [%level] [%logger] [%thread]
[%X{pekkoSource}] - %msg%n</pattern>
</encoder>
</appender>
diff --git
a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
b/pekko-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
similarity index 100%
rename from
akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
rename to
pekko-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
diff --git
a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
b/pekko-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
similarity index 80%
rename from
akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
rename to
pekko-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
index 872f28a..74621c4 100644
---
a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
+++
b/pekko-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
@@ -1,11 +1,11 @@
package sharding.kafka.producer
-import akka.Done
-import akka.actor.ActorSystem
-import akka.event.Logging
-import akka.kafka.ProducerSettings
-import akka.kafka.scaladsl.Producer
-import akka.stream.scaladsl.Source
+import org.apache.pekko.Done
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.event.Logging
+import org.apache.pekko.kafka.ProducerSettings
+import org.apache.pekko.kafka.scaladsl.Producer
+import org.apache.pekko.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer,
StringSerializer }
@@ -20,12 +20,12 @@ object UserEventProducer extends App {
implicit val system: ActorSystem = ActorSystem(
"UserEventProducer",
ConfigFactory.parseString("""
- akka.actor.provider = "local"
+ pekko.actor.provider = "local"
""".stripMargin).withFallback(ConfigFactory.load()).resolve())
val log = Logging(system, "UserEventProducer")
- val config = system.settings.config.getConfig("akka.kafka.producer")
+ val config = system.settings.config.getConfig("pekko.kafka.producer")
val producerConfig =
ProducerConfig(system.settings.config.getConfig("kafka-to-sharding-producer"))
@@ -36,7 +36,7 @@ object UserEventProducer extends App {
val nrUsers = 200
val maxPrice = 10000
val maxQuantity = 5
- val products = List("cat t-shirt", "akka t-shirt", "skis", "climbing shoes",
"rope")
+ val products = List("cat t-shirt", "pekko t-shirt", "skis", "climbing
shoes", "rope")
val done: Future[Done] =
Source
diff --git a/akka-sample-kafka-to-sharding-scala/project/build.properties
b/pekko-sample-kafka-to-sharding-scala/project/build.properties
similarity index 100%
rename from akka-sample-kafka-to-sharding-scala/project/build.properties
rename to pekko-sample-kafka-to-sharding-scala/project/build.properties
diff --git a/pekko-sample-kafka-to-sharding-scala/project/plugins.sbt
b/pekko-sample-kafka-to-sharding-scala/project/plugins.sbt
new file mode 100644
index 0000000..c8a1ae7
--- /dev/null
+++ b/pekko-sample-kafka-to-sharding-scala/project/plugins.sbt
@@ -0,0 +1,7 @@
+resolvers += "Apache
Snapshots".at("https://repository.apache.org/content/repositories/snapshots/")
+addSbtPlugin("org.apache.pekko" % "sbt-pekko-grpc" %
"0.0.0-15-3d8bff9d-SNAPSHOT")
+
+addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4") // ALPN agent
+addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.25")
+
+libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.11"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]