This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git


The following commit(s) were added to refs/heads/main by this push:
     new adec5c6  [kafka-to-elasticsearch] Migrate akka dependencies to pekko 
(#18)
adec5c6 is described below

commit adec5c6e7cae82dccd513378ea4c3fde30573a79
Author: Laglangyue <[email protected]>
AuthorDate: Sun Dec 17 17:01:19 2023 +0800

    [kafka-to-elasticsearch] Migrate akka dependencies to pekko (#18)
    
    * [kafka-to-elasticsearch] Migrate akka dependencies to pekko
    
    * [kafka-to-elasticsearch] Migrate akka dependencies to pekko
    
    * use confluentinc/cp-kafka:5.4.1
    
    * try to fix doc build
    
    ---------
    
    Co-authored-by: laglang <[email protected]>
    Co-authored-by: PJ Fanning <[email protected]>
---
 docs/build.sbt                                     |  40 ++--
 docs/project/Dependencies.scala                    |   8 +-
 docs/src/main/paradox/index.md                     |   2 +-
 .../project/Dependencies.scala                     |   4 +-
 .../step_003_parse_csv/README.md                   |   2 +-
 .../step_007_produce_to_kafka/README.md            |   2 +-
 .../README.md                                      |   2 +-
 .../build.sbt                                      |   2 +-
 .../project/Dependencies.scala                     |  31 ++-
 .../src/main/java/samples/javadsl/Helper.java      | 140 +++++++-------
 .../src/main/java/samples/javadsl/JsonMappers.java |   9 +-
 .../src/main/java/samples/javadsl/Main.java        | 215 +++++++++++----------
 .../src/main/java/samples/javadsl/Movie.java       |  27 ++-
 .../src/main/resources/application.conf            |   6 +-
 .../src/main/resources/logback.xml                 |   2 +-
 .../src/main/scala/samples/scaladsl/Helper.scala   |  35 ++--
 .../src/main/scala/samples/scaladsl/Main.scala     |  24 +--
 .../README.md                                      |   2 +-
 .../docs/src/main/paradox/example.md               |   2 +-
 .../docs/src/main/paradox/full-source.md           |   2 +-
 .../project/Dependencies.scala                     |   4 +-
 pekko-connectors-sample-mqtt-to-kafka/README.md    |   2 +-
 .../project/Dependencies.scala                     |   4 +-
 .../README.md                                      |   2 +-
 24 files changed, 283 insertions(+), 286 deletions(-)

diff --git a/docs/build.sbt b/docs/build.sbt
index 9d9ad40..55111b7 100644
--- a/docs/build.sbt
+++ b/docs/build.sbt
@@ -28,11 +28,11 @@ FtpToFile / paradoxProperties ++= Map(
   // Pekko Connectors
   "scaladoc.org.apache.pekko.stream.connectors.base_url" -> 
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}";,
   "javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
-  "extref.org.apache.pekko.stream.connectors.base_url" -> 
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}/%s";,
+  "extref.pekko-connectors.base_url" -> 
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}/%s";,
   // Pekko
   "scaladoc.org.apache.pekko.base_url" -> 
s"https://pekko.apache.org/api/pekko/${Dependencies.FtpToFile.PekkoVersion}";,
   "javadoc.org.apache.pekko.base_url" -> 
s"https://pekko.apache.org/japi/pekko/${Dependencies.FtpToFile.PekkoVersion}";,
-  "extref.org.apache.pekko.base_url" -> 
s"https://pekko.apache.org/docs/pekko/${Dependencies.FtpToFile.PekkoVersion}/%s";,
+  "extref.pekko.base_url" -> 
s"https://pekko.apache.org/docs/pekko/${Dependencies.FtpToFile.PekkoVersion}/%s";,
 )
 FtpToFile / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))
 
@@ -52,9 +52,9 @@ HttpCsvToKafka / paradoxProperties ++= Map(
   "javadoc.akka.base_url" -> "",
   "extref.alpakka.base_url" -> 
s"https://doc.akka.io/docs/alpakka/${Dependencies.HttpCsvToKafka.PekkoConnectorsVersion}/%s";,
   // Pekko Connectors Kafka
-  "scaladoc.akka.kafka.base_url" -> 
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.HttpCsvToKafka.AlpakkaKafkaVersion}";,
+  "scaladoc.akka.kafka.base_url" -> 
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}";,
   "javadoc.akka.kafka.base_url" -> "",
-  "extref.alpakka-kafka.base_url" -> 
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.HttpCsvToKafka.AlpakkaKafkaVersion}/%s";,
+  "extref.pekko-connectors-kafka.base_url" -> 
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}/%s";,
   // Pekko
   "scaladoc.akka.base_url" -> 
s"https://doc.akka.io/api/akka/${Dependencies.HttpCsvToKafka.PekkoVersion}";,
   "javadoc.akka.base_url" -> 
s"https://doc.akka.io/japi/akka/${Dependencies.HttpCsvToKafka.PekkoVersion}";,
@@ -126,17 +126,17 @@ KafkaToElasticsearch / paradoxProperties ++= Map(
   "snip.build.base_dir" -> 
s"${baseDirectory.value}/../pekko-connectors-sample-${KafkaToElasticsearch.name}",
   "github.root.base_dir" -> s"${baseDirectory.value}/..",
   // Pekko Connectors
-  "scaladoc.akka.stream.alpakka.base_url" -> 
s"https://doc.akka.io/api/alpakka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}";,
-  "javadoc.akka.base_url" -> "",
-  "extref.alpakka.base_url" -> 
s"https://doc.akka.io/docs/alpakka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}/%s";,
+  "scaladoc.org.apache.pekko.stream.connectors.base_url" -> 
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}";,
+  "javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
+  "extref.pekko-connectors.base_url" -> 
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}/%s";,
   // Pekko Connectors Kafka
-  "scaladoc.akka.kafka.base_url" -> 
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToElasticsearch.AlpakkaKafkaVersion}";,
-  "javadoc.akka.kafka.base_url" -> "",
-  "extref.alpakka-kafka.base_url" -> 
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToElasticsearch.AlpakkaKafkaVersion}/%s";,
+  "scaladoc.org.apache.pekko.kafka.base_url" -> 
s"https://pekko.apache.org/api/alpakka-kafka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsKafkaVersion}";,
+  "javadoc.org.apache.pekko.kafka.base_url" -> "",
+  "extref.pekko-connectors-kafka.base_url" -> 
s"https://pekko.apache.org/docs/alpakka-kafka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsKafkaVersion}/%s";,
   // Pekko
-  "scaladoc.akka.base_url" -> 
s"https://doc.akka.io/api/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}";,
-  "javadoc.akka.base_url" -> 
s"https://doc.akka.io/japi/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}";,
-  "extref.akka.base_url" -> 
s"https://doc.akka.io/docs/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}/%s";,
+  "scaladoc.org.apache.pekko.base_url" -> 
s"https://pekko.apache.org/api/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}";,
+  "javadoc.org.apache.pekko.base_url" -> 
s"https://pekko.apache.org/japi/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}";,
+  "extref.pekko.base_url" -> 
s"https://pekko.apache.org/docs/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}/%s";,
 )
 KafkaToElasticsearch / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))
 
@@ -156,9 +156,9 @@ KafkaToWebsocketClients / paradoxProperties ++= Map(
 //  "javadoc.akka.base_url" -> "",
 //  "extref.alpakka.base_url" -> 
s"https://doc.akka.io/docs/alpakka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsVersion}/%s";,
   // Pekko Connectors Kafka
-  "scaladoc.akka.kafka.base_url" -> 
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.AlpakkaKafkaVersion}";,
+  "scaladoc.akka.kafka.base_url" -> 
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsKafkaVersion}";,
   "javadoc.akka.kafka.base_url" -> "",
-  "extref.alpakka-kafka.base_url" -> 
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.AlpakkaKafkaVersion}/%s";,
+  "extref.pekko-connectors-kafka.base_url" -> 
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsKafkaVersion}/%s";,
   // Pekko
   "scaladoc.akka.base_url" -> 
s"https://doc.akka.io/api/akka/${Dependencies.KafkaToWebsocketClients.PekkoVersion}";,
   "javadoc.akka.base_url" -> 
s"https://doc.akka.io/japi/akka/${Dependencies.KafkaToWebsocketClients.PekkoVersion}";,
@@ -186,9 +186,9 @@ MqttToKafka / paradoxProperties ++= Map(
   "javadoc.akka.base_url" -> "",
   "extref.alpakka.base_url" -> 
s"https://doc.akka.io/docs/alpakka/${Dependencies.MqttToKafka.PekkoConnectorsVersion}/%s";,
   // Pekko Connectors Kafka
-  "scaladoc.akka.kafka.base_url" -> 
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.MqttToKafka.AlpakkaKafkaVersion}";,
+  "scaladoc.akka.kafka.base_url" -> 
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.MqttToKafka.PekkoConnectorsKafkaVersion}";,
   "javadoc.akka.kafka.base_url" -> "",
-  "extref.alpakka-kafka.base_url" -> 
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.MqttToKafka.AlpakkaKafkaVersion}/%s";,
+  "extref.pekko-connectors-kafka.base_url" -> 
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.MqttToKafka.PekkoConnectorsKafkaVersion}/%s";,
   // Pekko
   "scaladoc.akka.base_url" -> 
s"https://doc.akka.io/api/akka/${Dependencies.MqttToKafka.PekkoVersion}";,
   "javadoc.akka.base_url" -> 
s"https://doc.akka.io/japi/akka/${Dependencies.MqttToKafka.PekkoVersion}";,
@@ -211,11 +211,11 @@ FileToElasticsearch / paradoxProperties ++= Map(
   // Pekko Connectors
   "scaladoc.org.apache.pekko.stream.connectors.base_url" -> 
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}";,
   "javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
-  "extref.org.apache.pekko.stream.connectors.base_url" -> 
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}/%s";,
+  "extref.pekko-connectors.base_url" -> 
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}/%s";,
   // Pekko
   "scaladoc.org.apache.pekko.base_url" -> 
s"https://pekko.apache.org/api/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}";,
   "javadoc.org.apache.pekko.base_url" -> 
s"https://pekko.apache.org/japi/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}";,
-  "extref.org.apache.pekko.base_url" -> 
s"https://pekko.apache.org/docs/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}/%s";,
+  "extref.pekko.base_url" -> 
s"https://pekko.apache.org/docs/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}/%s";,
 )
 FileToElasticsearch / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))
 
@@ -246,7 +246,7 @@ Paradox / siteSubdirName := ""
 paradoxProperties ++= Map(
   "extref.akka.base_url" -> "https://doc.akka.io/docs/akka/current/";,
   "extref.alpakka.base_url" -> "https://doc.akka.io/docs/alpakka/current/";,
-  "extref.alpakka-kafka.base_url" -> 
"https://doc.akka.io/docs/alpakka-kafka/current/";,
+  "extref.pekko-connectors-kafka.base_url" -> 
"https://doc.akka.io/docs/alpakka-kafka/current/";,
   "extref.ftp-to-file.base_url" -> s"${(FtpToFile / siteSubdirName).value}/",
   "extref.http-csv-to-kafka.base_url" -> s"${(HttpCsvToKafka / 
siteSubdirName).value}/",
   "extref.jdbc-to-elasticsearch.base_url" -> s"${(JdbcToElasticsearch / 
siteSubdirName).value}/",
diff --git a/docs/project/Dependencies.scala b/docs/project/Dependencies.scala
index 2fcd09b..af5360b 100644
--- a/docs/project/Dependencies.scala
+++ b/docs/project/Dependencies.scala
@@ -31,7 +31,7 @@ object Dependencies {
     val PekkoVersion = versions("PekkoVersion")
     val AkkaHttpVersion = versions("AkkaHttpVersion")
     val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
-    val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
+    val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
   }
 
   object JdbcToElasticsearch {
@@ -75,7 +75,7 @@ object Dependencies {
     val ScalaVersion = versions("scalaVer")
     val PekkoVersion = versions("PekkoVersion")
     val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
-    val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
+    val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
   }
   
   object KafkaToWebsocketClients {
@@ -90,7 +90,7 @@ object Dependencies {
     val ScalaVersion = versions("scalaVer")
     val PekkoVersion = versions("PekkoVersion")
     val AkkaHttpVersion = versions("AkkaHttpVersion")
-    val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
+    val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
   }
 
   object MqttToKafka {
@@ -105,7 +105,7 @@ object Dependencies {
     val ScalaVersion = versions("scalaVer")
     val PekkoVersion = versions("PekkoVersion")
     val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
-    val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
+    val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
   }
 
   object FileToElasticsearch {
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index c410318..e4a3d4c 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -25,7 +25,7 @@ This example uses Pekko Connectors File to watch for new 
files created in a dire
 Read a stream of data and store it zipped in rotating files on an SFTP server.
 
 ### @extref:[Subscribe to MQTT and produce to Kafka](mqtt-to-kafka:index.html)
-Subscribe to an MQTT topic with @extref[Pekko Connectors 
MQTT](pekko-connectors:/mqtt.html), group a few values and publish the 
aggregate to a Kafka topic with @extref[Pekko Connectors Kafka](alpakka-kafka:).
+Subscribe to an MQTT topic with @extref[Pekko Connectors 
MQTT](pekko-connectors:/mqtt.html), group a few values and publish the 
aggregate to a Kafka topic with @extref[Pekko Connectors 
Kafka](pekko-connectors-kafka:).
 
 ### @link:[Amazon 
SQS](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-sqs-java)
 { open=new }
 Listen to an Amazon SQS topic, enrich the message via calling an actor, 
publish a new message to SQS and acknowledge/delete the original message. (Java 
only)
diff --git 
a/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala 
b/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala
index cc5be29..5277491 100644
--- a/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala
+++ b/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala
@@ -7,11 +7,11 @@ object Dependencies {
   val PekkoVersion = "2.6.19"
   val AkkaHttpVersion = "10.1.12"
   val PekkoConnectorsVersion = "4.0.0"
-  val AlpakkaKafkaVersion = "3.0.1"
+  val PekkoConnectorsKafkaVersion = "3.0.1"
 
   val dependencies = List(
     "com.lightbend.akka" %% "akka-stream-alpakka-csv" % PekkoConnectorsVersion,
-    "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+    "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
     "com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
     "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
     "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
diff --git 
a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md 
b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md
index ca8e0b6..8f85b07 100644
--- a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md
+++ b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md
@@ -2,4 +2,4 @@
 
 ### Description
 
-The binary data in @scaladoc:[ByteString](akka.util.ByteString)s is passed 
into @extref:[Alpakka CSV](alpakka:data-transformations/csv.html) to be parsed 
and converted per line into a Map. The stream elements becomes a 
@scala[`Map[String, ByteString]`]@java[`Map<String, ByteString>`], one entry 
per column using the column headers as keys.
+The binary data in @scaladoc:[ByteString](akka.util.ByteString)s is passed 
into @extref:[Pekko-Connectors CSV](alpakka:data-transformations/csv.html) to 
be parsed and converted per line into a Map. The stream elements becomes a 
@scala[`Map[String, ByteString]`]@java[`Map<String, ByteString>`], one entry 
per column using the column headers as keys.
diff --git 
a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md 
b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md
index e6b6510..d220156 100644
--- 
a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md
+++ 
b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md
@@ -4,6 +4,6 @@
 
 [Testcontainers](https://www.testcontainers.org/) starts a Kafka broker in 
Docker. 
 
-@extref:[Alpakka Kafka](alpakka-kafka:producer.html) producer settings specify 
the broker address and the data types for Kafka's key and value.
+@extref:[Pekko-Connectors Kafka](pekko-connectors-kafka:producer.html) 
producer settings specify the broker address and the data types for Kafka's key 
and value.
 
 @scaladoc:[Producer.plainSink](akka.kafka.scaladsl.Producer$) sends the 
`ProducerRecord`s stream elements to the specified Kafka topic.
diff --git a/pekko-connectors-sample-kafka-to-elasticsearch/README.md 
b/pekko-connectors-sample-kafka-to-elasticsearch/README.md
index 865e3ff..bdab891 100644
--- a/pekko-connectors-sample-kafka-to-elasticsearch/README.md
+++ b/pekko-connectors-sample-kafka-to-elasticsearch/README.md
@@ -2,7 +2,7 @@
 
 ## Read from a Kafka topic and publish to Elasticsearch
 
-This example uses @extref[Alpakka Kafka](alpakka-kafka:) to subscribe to a 
Kafka topic, parses JSON into a data class and stores the object in 
Elasticsearch. After storing the Kafka offset is committed back to Kafka. This 
gives at-least-once semantics.
+This example uses @extref[Pekko-Connectors Kafka](pekko-connectors-kafka:) to 
subscribe to a Kafka topic, parses JSON into a data class and stores the object 
in Elasticsearch. After storing the Kafka offset is committed back to Kafka. 
This gives at-least-once semantics.
 
 Browse the sources at 
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-kafka-to-elasticsearch)
 { open=new }.
 
diff --git a/pekko-connectors-sample-kafka-to-elasticsearch/build.sbt 
b/pekko-connectors-sample-kafka-to-elasticsearch/build.sbt
index d5eb377..49f241e 100644
--- a/pekko-connectors-sample-kafka-to-elasticsearch/build.sbt
+++ b/pekko-connectors-sample-kafka-to-elasticsearch/build.sbt
@@ -1,4 +1,4 @@
-organization := "com.lightbend.akka"
+organization := "org.apache.pekko"
 version := "1.0.0"
 scalaVersion := Dependencies.scalaVer
 libraryDependencies ++= Dependencies.dependencies
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/project/Dependencies.scala 
b/pekko-connectors-sample-kafka-to-elasticsearch/project/Dependencies.scala
index 09d310f..e73343f 100644
--- a/pekko-connectors-sample-kafka-to-elasticsearch/project/Dependencies.scala
+++ b/pekko-connectors-sample-kafka-to-elasticsearch/project/Dependencies.scala
@@ -3,29 +3,28 @@ import sbt._
 object Dependencies {
   val scalaVer = "2.13.12"
   // #deps
-  val PekkoVersion = "2.6.19"
-  val PekkoConnectorsVersion = "4.0.0"
-  val AlpakkaKafkaVersion = "3.0.1"
+  val PekkoVersion = "1.0.2"
+  val PekkoConnectorsVersion = "1.0.1"
+  val PekkoConnectorsKafkaVersion = "1.0.0"
 
   // #deps
 
   val dependencies = List(
-  // #deps
-    "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % 
PekkoConnectorsVersion,
-    "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
-    "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
-    "com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
-    "com.typesafe.akka" %% "akka-actor" % PekkoVersion,
+    // #deps
+    "org.apache.pekko" %% "pekko-connectors-elasticsearch" % 
PekkoConnectorsVersion,
+    "org.apache.pekko" %% "pekko-connectors-kafka" % 
PekkoConnectorsKafkaVersion,
+    "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-actor" % PekkoVersion,
     // for JSON in Scala
     "io.spray" %% "spray-json" % "1.3.6",
     // for JSON in Java
-    "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.3",
-    "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.13.3",
+    "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.14.3",
+    "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.14.3",
     // Logging
-    "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
     "ch.qos.logback" % "logback-classic" % "1.2.13",
-  // #deps
-    "org.testcontainers" % "elasticsearch" % "1.17.3",
-    "org.testcontainers" % "kafka" % "1.17.3"
-  )
+    // #deps
+    "org.testcontainers" % "elasticsearch" % "1.17.6",
+    "org.testcontainers" % "kafka" % "1.17.6")
 }
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Helper.java
 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Helper.java
index 16716f8..d444337 100644
--- 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Helper.java
+++ 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Helper.java
@@ -1,14 +1,15 @@
 package samples.javadsl;
 
-import akka.Done;
-import akka.actor.typed.ActorSystem;
-import static akka.actor.typed.javadsl.Adapter.*;
-import akka.kafka.ProducerSettings;
-import akka.kafka.javadsl.Producer;
-import akka.stream.alpakka.elasticsearch.*;
-import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
+import static org.apache.pekko.actor.typed.javadsl.Adapter.*;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.kafka.ProducerSettings;
+import org.apache.pekko.kafka.javadsl.Producer;
+import org.apache.pekko.stream.connectors.elasticsearch.*;
+import 
org.apache.pekko.stream.connectors.elasticsearch.javadsl.ElasticsearchSource;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -16,72 +17,73 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
 
 import java.util.List;
 import java.util.concurrent.CompletionStage;
 
 public class Helper {
 
-    private static final Logger log = LoggerFactory.getLogger(Helper.class);
-
-    private ElasticsearchContainer elasticsearchContainer;
-    public ElasticsearchConnectionSettings connectionSettings;
-
-    private KafkaContainer kafka;
-    public String kafkaBootstrapServers;
-
-    public Helper() {
-    }
-
-    public void startContainers() {
-        elasticsearchContainer =
-                new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2");
-        elasticsearchContainer.start();
-        connectionSettings = ElasticsearchConnectionSettings.create("http://"; 
+ elasticsearchContainer.getHttpHostAddress());
-
-        kafka = new KafkaContainer("5.4.1"); // contains Kafka 2.4.x
-        kafka.start();
-        kafkaBootstrapServers = kafka.getBootstrapServers();
-    }
-
-    public void stopContainers() {
-        kafka.stop();
-        elasticsearchContainer.stop();
-    }
-
-    CompletionStage<Done> writeToKafka(String topic, List<Movie> movies, 
ActorSystem<?> actorSystem) {
-        ProducerSettings<Integer, String> kafkaProducerSettings =
-                ProducerSettings.create(toClassic(actorSystem), new 
IntegerSerializer(), new StringSerializer())
-                        .withBootstrapServers(kafkaBootstrapServers);
-
-        CompletionStage<Done> producing =
-                Source.from(movies)
-                        .map(
-                                movie -> {
-                                    log.debug("producing {}", movie);
-                                    String json = 
JsonMappers.movieWriter.writeValueAsString(movie);
-                                    return new ProducerRecord<>(topic, 
movie.id, json);
-                                })
-                        .runWith(Producer.plainSink(kafkaProducerSettings), 
actorSystem);
-        producing.thenAccept(s -> log.info("Producing finished"));
-        return producing;
-    }
-
-    CompletionStage<List<Movie>> readFromElasticsearch(String indexName, 
ActorSystem<?> actorSystem) {
-        CompletionStage<List<Movie>> reading =
-                ElasticsearchSource.typed(
-                        ElasticsearchParams.V7(indexName),
-                        "{\"match_all\": {}}",
-                        ElasticsearchSourceSettings.create(connectionSettings),
-                        Movie.class)
-                        .map(readResult -> readResult.source())
-                        .runWith(Sink.seq(), actorSystem);
-        reading.thenAccept(
-                non -> {
-                    log.info("Reading finished");
-                });
-        return reading;
-    }
+  private static final Logger log = LoggerFactory.getLogger(Helper.class);
+
+  private ElasticsearchContainer elasticsearchContainer;
+  public ElasticsearchConnectionSettings connectionSettings;
+
+  private KafkaContainer kafka;
+  public String kafkaBootstrapServers;
+
+  public Helper() {
+  }
+
+  public void startContainers() {
+    elasticsearchContainer =
+        new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2");
+    elasticsearchContainer.start();
+    connectionSettings = ElasticsearchConnectionSettings.create("http://"; + 
elasticsearchContainer.getHttpHostAddress());
+
+    kafka = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.1")); // 
contains Kafka 2.4.x
+    kafka.start();
+    kafkaBootstrapServers = kafka.getBootstrapServers();
+  }
+
+  public void stopContainers() {
+    kafka.stop();
+    elasticsearchContainer.stop();
+  }
+
+  CompletionStage<Done> writeToKafka(String topic, List<Movie> movies, 
ActorSystem<?> actorSystem) {
+    ProducerSettings<Integer, String> kafkaProducerSettings =
+        ProducerSettings.create(toClassic(actorSystem), new 
IntegerSerializer(), new StringSerializer())
+            .withBootstrapServers(kafkaBootstrapServers);
+
+    CompletionStage<Done> producing =
+        Source.from(movies)
+            .map(
+                movie -> {
+                  log.debug("producing {}", movie);
+                  String json = 
JsonMappers.movieWriter.writeValueAsString(movie);
+                  return new ProducerRecord<>(topic, movie.id, json);
+                })
+            .runWith(Producer.plainSink(kafkaProducerSettings), actorSystem);
+    producing.thenAccept(s -> log.info("Producing finished"));
+    return producing;
+  }
+
+  CompletionStage<List<Movie>> readFromElasticsearch(String indexName, 
ActorSystem<?> actorSystem) {
+    CompletionStage<List<Movie>> reading =
+        ElasticsearchSource.typed(
+                ElasticsearchParams.V7(indexName),
+                "{\"match_all\": {}}",
+                ElasticsearchSourceSettings.create(connectionSettings),
+                Movie.class)
+            .map(readResult -> readResult.source())
+            .runWith(Sink.seq(), actorSystem);
+    reading.thenAccept(
+        non -> {
+          log.info("Reading finished");
+        });
+    return reading;
+  }
 
 
 }
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/JsonMappers.java
 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/JsonMappers.java
index a3d3767..9ef310b 100644
--- 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/JsonMappers.java
+++ 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/JsonMappers.java
@@ -6,8 +6,9 @@ import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 class JsonMappers {
-    // Jackson conversion setup (3)
-    public final static ObjectMapper mapper = new 
ObjectMapper().registerModule(new JavaTimeModule());
-    public final static ObjectWriter movieWriter = 
mapper.writerFor(Movie.class);
-    public final static ObjectReader movieReader = 
mapper.readerFor(Movie.class);
+
+  // Jackson's conversion setup (3)
+  public final static ObjectMapper mapper = new 
ObjectMapper().registerModule(new JavaTimeModule());
+  public final static ObjectWriter movieWriter = mapper.writerFor(Movie.class);
+  public final static ObjectReader movieReader = mapper.readerFor(Movie.class);
 }
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Main.java
 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Main.java
index 71d43f4..d397885 100644
--- 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Main.java
+++ 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Main.java
@@ -6,19 +6,20 @@ package samples.javadsl;
 
 // #imports
 
-import akka.Done;
-import akka.NotUsed;
-import akka.actor.typed.ActorSystem;
-import static akka.actor.typed.javadsl.Adapter.*;
-import akka.actor.typed.javadsl.Behaviors;
-import akka.kafka.CommitterSettings;
-import akka.kafka.ConsumerSettings;
-import akka.kafka.Subscriptions;
-import akka.kafka.javadsl.Committer;
-import akka.kafka.javadsl.Consumer;
-import akka.stream.alpakka.elasticsearch.*;
-import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow;
-import akka.stream.javadsl.Keep;
+import static org.apache.pekko.actor.typed.javadsl.Adapter.*;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.kafka.CommitterSettings;
+import org.apache.pekko.kafka.ConsumerSettings;
+import org.apache.pekko.kafka.Subscriptions;
+import org.apache.pekko.kafka.javadsl.Committer;
+import org.apache.pekko.kafka.javadsl.Consumer;
+import org.apache.pekko.stream.connectors.elasticsearch.*;
+import 
org.apache.pekko.stream.connectors.elasticsearch.javadsl.ElasticsearchFlow;
+import org.apache.pekko.stream.javadsl.Keep;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -34,99 +35,99 @@ import java.util.concurrent.TimeUnit;
 
 public class Main {
 
-    private static final Logger log = LoggerFactory.getLogger(Main.class);
-
-    private final Helper helper;
-    private final ElasticsearchConnectionSettings elasticsearchAddress;
-    private final String kafkaBootstrapServers;
-
-    private final String topic = "movies-to-elasticsearch";
-    private final String groupId = "docs-group";
-
-    // #es-setup
-    private final String indexName = "movies";
-
-    // #es-setup
-
-    public Main(Helper helper) {
-        this.elasticsearchAddress = helper.connectionSettings;
-        this.kafkaBootstrapServers = helper.kafkaBootstrapServers;
-        this.helper = helper;
-    }
-
-    private ActorSystem<Void> actorSystem;
-
-    private Consumer.DrainingControl<Done> readFromKafkaToEleasticsearch() {
-        // #kafka-setup
-        // configure Kafka consumer (1)
-        ConsumerSettings<Integer, String> kafkaConsumerSettings =
-                ConsumerSettings.create(actorSystem, new 
IntegerDeserializer(), new StringDeserializer())
-                        .withBootstrapServers(kafkaBootstrapServers)
-                        .withGroupId(groupId)
-                        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
-                        .withStopTimeout(Duration.ofSeconds(5));
-        // #kafka-setup
-
-        // #flow
-        Consumer.DrainingControl<Done> control =
-                Consumer.sourceWithOffsetContext(kafkaConsumerSettings, 
Subscriptions.topics(topic)) // (5)
-                        .map(
-                                consumerRecord -> { // (6)
-                                    Movie movie = 
JsonMappers.movieReader.readValue(consumerRecord.value());
-                                    return 
WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
-                                })
-                        .via(
-                                ElasticsearchFlow.createWithContext(
-                                        ElasticsearchParams.V7(indexName),
-                                        
ElasticsearchWriteSettings.create(elasticsearchAddress),
-                                        JsonMappers.mapper)) // (7)
-                        .map(
-                                writeResult -> { // (8)
-                                    writeResult
-                                            .getError()
-                                            .ifPresent(
-                                                    errorJson -> {
-                                                        throw new 
RuntimeException(
-                                                                "Elasticsearch 
update failed "
-                                                                        + 
writeResult.getErrorReason().orElse(errorJson));
-                                                    });
-                                    return NotUsed.notUsed();
-                                })
-                        
.toMat(Committer.sinkWithOffsetContext(CommitterSettings.create(toClassic(actorSystem))),
 Keep.both()) // (9)
-                        .mapMaterializedValue(Consumer::createDrainingControl) 
// (10)
-                        .run(actorSystem);
-        // #flow
-        return control;
-    }
-
-    private CompletionStage<Done> run() throws Exception {
-        actorSystem = ActorSystem.create(Behaviors.empty(), 
"KafkaToElasticSearch");
-
-        List<Movie> movies = Arrays.asList(new Movie(23, "Psycho"), new 
Movie(423, "Citizen Kane"));
-        CompletionStage<Done> writing = helper.writeToKafka(topic, movies, 
actorSystem);
-        writing.toCompletableFuture().get(10, TimeUnit.SECONDS);
-
-        Consumer.DrainingControl<Done> control = 
readFromKafkaToEleasticsearch();
-        TimeUnit.SECONDS.sleep(5);
-        CompletionStage<Done> copyingFinished = 
control.drainAndShutdown(actorSystem.executionContext());
-        copyingFinished.toCompletableFuture().get(10, TimeUnit.SECONDS);
-        CompletionStage<List<Movie>> reading = 
helper.readFromElasticsearch(indexName, actorSystem);
-
-        return reading.thenCompose(
-                ms -> {
-                    ms.forEach(m -> System.out.println("read " + m));
-                    actorSystem.terminate();
-                    return actorSystem.getWhenTerminated();
-                });
-    }
-
-    public static void main(String[] args) throws Exception {
-        Helper helper = new Helper();
-        helper.startContainers();
-        Main main = new Main(helper);
-        CompletionStage<Done> run = main.run();
-        run.thenAccept(res -> {
-            helper.stopContainers();
+  private static final Logger log = LoggerFactory.getLogger(Main.class);
+
+  private final Helper helper;
+  private final ElasticsearchConnectionSettings elasticsearchAddress;
+  private final String kafkaBootstrapServers;
+
+  private final String topic = "movies-to-elasticsearch";
+  private final String groupId = "docs-group";
+
+  // #es-setup
+  private final String indexName = "movies";
+
+  // #es-setup
+
+  public Main(Helper helper) {
+    this.elasticsearchAddress = helper.connectionSettings;
+    this.kafkaBootstrapServers = helper.kafkaBootstrapServers;
+    this.helper = helper;
+  }
+
+  private ActorSystem<Void> actorSystem;
+
+  private Consumer.DrainingControl<Done> readFromKafkaToEleasticsearch() {
+    // #kafka-setup
+    // configure Kafka consumer (1)
+    ConsumerSettings<Integer, String> kafkaConsumerSettings =
+        ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new 
StringDeserializer())
+            .withBootstrapServers(kafkaBootstrapServers)
+            .withGroupId(groupId)
+            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+            .withStopTimeout(Duration.ofSeconds(5));
+    // #kafka-setup
+
+    // #flow
+    Consumer.DrainingControl<Done> control =
+        Consumer.sourceWithOffsetContext(kafkaConsumerSettings, 
Subscriptions.topics(topic)) // (5)
+            .map(
+                consumerRecord -> { // (6)
+                  Movie movie = 
JsonMappers.movieReader.readValue(consumerRecord.value());
+                  return 
WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
+                })
+            .via(
+                ElasticsearchFlow.createWithContext(
+                    ElasticsearchParams.V7(indexName),
+                    ElasticsearchWriteSettings.create(elasticsearchAddress),
+                    JsonMappers.mapper)) // (7)
+            .map(
+                writeResult -> { // (8)
+                  writeResult
+                      .getError()
+                      .ifPresent(
+                          errorJson -> {
+                            throw new RuntimeException(
+                                "Elasticsearch update failed "
+                                    + 
writeResult.getErrorReason().orElse(errorJson));
+                          });
+                  return NotUsed.notUsed();
+                })
+            
.toMat(Committer.sinkWithOffsetContext(CommitterSettings.create(toClassic(actorSystem))),
 Keep.both()) // (9)
+            .mapMaterializedValue(Consumer::createDrainingControl) // (10)
+            .run(actorSystem);
+    // #flow
+    return control;
+  }
+
+  private CompletionStage<Done> run() throws Exception {
+    actorSystem = ActorSystem.create(Behaviors.empty(), 
"KafkaToElasticSearch");
+
+    List<Movie> movies = Arrays.asList(new Movie(23, "Psycho"), new Movie(423, 
"Citizen Kane"));
+    CompletionStage<Done> writing = helper.writeToKafka(topic, movies, 
actorSystem);
+    writing.toCompletableFuture().get(10, TimeUnit.SECONDS);
+
+    Consumer.DrainingControl<Done> control = readFromKafkaToEleasticsearch();
+    TimeUnit.SECONDS.sleep(5);
+    CompletionStage<Done> copyingFinished = 
control.drainAndShutdown(actorSystem.executionContext());
+    copyingFinished.toCompletableFuture().get(10, TimeUnit.SECONDS);
+    CompletionStage<List<Movie>> reading = 
helper.readFromElasticsearch(indexName, actorSystem);
+
+    return reading.thenCompose(
+        ms -> {
+          ms.forEach(m -> System.out.println("read " + m));
+          actorSystem.terminate();
+          return actorSystem.getWhenTerminated();
         });
-    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Helper helper = new Helper();
+    helper.startContainers();
+    Main main = new Main(helper);
+    CompletionStage<Done> run = main.run();
+    run.thenAccept(res -> {
+      helper.stopContainers();
+    });
+  }
 }
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Movie.java
 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Movie.java
index 5fb955c..09d47d8 100644
--- 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Movie.java
+++ 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Movie.java
@@ -2,25 +2,22 @@ package samples.javadsl;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 // Type in Elasticsearch (2)
 public class Movie {
-    public final int id;
-    public final String title;
 
-    @JsonCreator
-    public Movie(@JsonProperty("id") int id, @JsonProperty("title") String 
title) {
-        this.id = id;
-        this.title = title;
-    }
+  public final int id;
+  public final String title;
 
-    @Override
-    public String toString() {
-        return "Movie(" + id + ", title=" + title + ")";
-    }
+  @JsonCreator
+  public Movie(@JsonProperty("id") int id, @JsonProperty("title") String 
title) {
+    this.id = id;
+    this.title = title;
+  }
+
+  @Override
+  public String toString() {
+    return "Movie(" + id + ", title=" + title + ")";
+  }
 }
 
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/application.conf
 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/application.conf
index 7bdde8d..77117b1 100644
--- 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/application.conf
+++ 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/application.conf
@@ -1,5 +1,5 @@
-akka {
-  loggers = ["akka.event.slf4j.Slf4jLogger"]
-  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+pekko {
+  loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+  logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
   loglevel = "DEBUG"
 }
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/logback.xml 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/logback.xml
index a57a169..9baedff 100644
--- 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/logback.xml
+++ 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/logback.xml
@@ -9,7 +9,7 @@
 
     <logger name="org.apache" level="WARN"/>
     <logger name="kafka" level="WARN"/>
-    <logger name="akka" level="WARN"/>
+    <logger name="pekko" level="WARN"/>
     <logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
     <logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>
     <logger name="org.I0Itec.zkclient" level="WARN"/>
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Helper.scala
 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Helper.scala
index 5d34720..325488e 100644
--- 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Helper.scala
+++ 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Helper.scala
@@ -4,43 +4,39 @@
 
 package samples.scaladsl
 
-import akka.actor.typed.ActorSystem
-import akka.kafka._
-import akka.kafka.scaladsl.Producer
-import akka.actor.typed.scaladsl.adapter._
-import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSource
-import akka.stream.scaladsl.{Sink, Source}
-import akka.Done
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.serialization._
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.adapter._
+import org.apache.pekko.kafka._
+import org.apache.pekko.kafka.scaladsl.Producer
+import 
org.apache.pekko.stream.connectors.elasticsearch.scaladsl.ElasticsearchSource
+import org.apache.pekko.stream.scaladsl.{ Sink, Source }
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.elasticsearch._
 import org.slf4j.LoggerFactory
 import org.testcontainers.containers.KafkaContainer
 import org.testcontainers.elasticsearch.ElasticsearchContainer
-import spray.json.DefaultJsonProtocol._
+import org.testcontainers.utility.DockerImageName
+import samples.scaladsl.JsonFormats._
 import spray.json._
 
 import scala.collection.immutable
-import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future}
-import JsonFormats._
-import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings
-import akka.stream.alpakka.elasticsearch.ElasticsearchParams
-import akka.stream.alpakka.elasticsearch.ElasticsearchSourceSettings
-import samples.scaladsl.Main.{elasticsearchContainer, kafka}
+import scala.concurrent.Future
 
 trait Helper {
 
   final val log = LoggerFactory.getLogger(getClass)
 
-  // Testcontainers: start Elasticsearch in Docker
+  // TestContainers: start Elasticsearch in Docker
   val elasticsearchContainer = new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2")
   elasticsearchContainer.start()
   val connectionSettings = ElasticsearchConnectionSettings("http://"; + 
elasticsearchContainer.getHttpHostAddress)
 
-  // Testcontainers: start Kafka in Docker
+  // TestContainers: start Kafka in Docker
   // [[https://hub.docker.com/r/confluentinc/cp-kafka/tags Available Docker 
images]]
   // 
[[https://docs.confluent.io/current/installation/versions-interoperability.html 
Kafka versions in Confluent Platform]]
-  val kafka = new KafkaContainer("5.4.1") // contains Kafka 2.4.x
+  val kafka = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.1")) // 
contains Kafka 2.4.x
   kafka.start()
   val kafkaBootstrapServers = kafka.getBootstrapServers
 
@@ -60,7 +56,8 @@ trait Helper {
 
   def readFromElasticsearch(indexName: String)(implicit actorSystem: 
ActorSystem[_]): Future[immutable.Seq[Movie]] = {
     val reading = ElasticsearchSource
-      .typed[Movie](ElasticsearchParams.V7(indexName), """{"match_all": 
{}}""", ElasticsearchSourceSettings(connectionSettings))
+      .typed[Movie](ElasticsearchParams.V7(indexName), """{"match_all": {}}""",
+        ElasticsearchSourceSettings(connectionSettings))
       .map(_.source)
       .runWith(Sink.seq)
     reading.foreach(_ => log.info("Reading 
finished"))(actorSystem.executionContext)
diff --git 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Main.scala
 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Main.scala
index 9a10e95..f552ce1 100644
--- 
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Main.scala
+++ 
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Main.scala
@@ -5,23 +5,23 @@
 package samples.scaladsl
 
 // #imports
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.scaladsl.adapter._
-import akka.kafka._
-import akka.kafka.scaladsl.{Committer, Consumer}
-import akka.stream.alpakka.elasticsearch.ElasticsearchParams
-import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings
-import akka.stream.alpakka.elasticsearch.WriteMessage
-import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchFlow
-import akka.stream.scaladsl.Sink
-import akka.{Done, NotUsed}
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.actor.typed.scaladsl.adapter._
+import org.apache.pekko.kafka._
+import org.apache.pekko.kafka.scaladsl.{ Committer, Consumer }
+import org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchParams
+import 
org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchWriteSettings
+import org.apache.pekko.stream.connectors.elasticsearch.WriteMessage
+import 
org.apache.pekko.stream.connectors.elasticsearch.scaladsl.ElasticsearchFlow
+import org.apache.pekko.stream.scaladsl.Sink
+import org.apache.pekko.{ Done, NotUsed }
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization._
 import spray.json._
 
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ Await, ExecutionContext, Future }
 // #imports
 
 object Main extends App with Helper {
diff --git a/pekko-connectors-sample-kafka-to-websocket-clients/README.md 
b/pekko-connectors-sample-kafka-to-websocket-clients/README.md
index a18b177..870a6e1 100644
--- a/pekko-connectors-sample-kafka-to-websocket-clients/README.md
+++ b/pekko-connectors-sample-kafka-to-websocket-clients/README.md
@@ -2,7 +2,7 @@
 
 ## Read from a Kafka topic and push the data to connected websocket clients
 
-Clients may connect via websockets and will receive data read from a Kafka 
topic. The websockets are implemented in @extref[Akka HTTP](akka-http:) and 
[Alpakka Kafka](alpakka-kafka:) subscribes to the Kafka topic.
+Clients may connect via websockets and will receive data read from a Kafka 
topic. The websockets are implemented in @extref[Akka HTTP](akka-http:) and 
[Pekko-Connectors Kafka](pekko-connectors-kafka:) subscribes to the Kafka topic.
 
 Browse the sources at 
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-kafka-to-websocket-clients)
 { open=new }.
 
diff --git 
a/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/example.md
 
b/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/example.md
index a47bfcc..ec4c4ba 100644
--- 
a/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/example.md
+++ 
b/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/example.md
@@ -4,7 +4,7 @@ This page highlights the most important sections of the example.
 
 ### Subscribe to the Kafka topic
 
-Use an @extref:[Alpakka Kafka](alpakka-kafka:) consumer to subscribe to a 
topic in Kafka. The received `String` values are sent to a 
@extref[BroadcastHub](akka:stream/stream-dynamic.html#using-the-broadcasthub) 
which creates a `Source` for the clients to connect to.
+Use an @extref:[Pekko-Connectors Kafka](pekko-connectors-kafka:) consumer to 
subscribe to a topic in Kafka. The received `String` values are sent to a 
@extref[BroadcastHub](akka:stream/stream-dynamic.html#using-the-broadcasthub) 
which creates a `Source` for the clients to connect to.
 
 Java
 : @@snip [snip](/src/main/java/samples/javadsl/Main.java) { 
#kafka-to-broadcast }
diff --git 
a/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/full-source.md
 
b/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/full-source.md
index 1eff395..ad6b292 100644
--- 
a/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/full-source.md
+++ 
b/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/full-source.md
@@ -8,7 +8,7 @@ Java
 ## Helper
 
 The helper uses 
[Testcontainers](https://www.testcontainers.org/modules/kafka/) to start a 
Kafka broker.
-The `writeToKafka()` method populates the Kafka topic using @extref[Alpakka 
Kafka](alpakka-kafka:producer.html)
+The `writeToKafka()` method populates the Kafka topic using 
@extref[Pekko-Connectors Kafka](pekko-connectors-kafka:producer.html)
 
 Java
 : @@snip [snip](/src/main/java/samples/javadsl/Helper.java) { 
filterLabels=true }
diff --git 
a/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala 
b/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
index 2734d73..3e56eb4 100644
--- 
a/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
+++ 
b/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
@@ -5,13 +5,13 @@ object Dependencies {
   // #deps
   val PekkoVersion = "2.6.19"
   val AkkaHttpVersion = "10.1.12"
-  val AlpakkaKafkaVersion = "3.0.1"
+  val PekkoConnectorsKafkaVersion = "3.0.1"
   // #deps
   val dependencies = List(
     // #deps
     "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
     "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
-    "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+    "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
 
     // Logging
     "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
diff --git a/pekko-connectors-sample-mqtt-to-kafka/README.md 
b/pekko-connectors-sample-mqtt-to-kafka/README.md
index 8cf3e3c..43f49a5 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/README.md
+++ b/pekko-connectors-sample-mqtt-to-kafka/README.md
@@ -2,7 +2,7 @@
 
 ## Read from an MQTT topic, group messages and publish to Kafka
 
-Subscribe to an MQTT topic with @extref[Alpakka MQTT](alpakka:/mqtt.html), 
group a few values and publish the aggregate to a Kafka topic with 
@extref[Alpakka Kafka](alpakka-kafka:).
+Subscribe to an MQTT topic with @extref[Pekko-Connectors 
MQTT](alpakka:/mqtt.html), group a few values and publish the aggregate to a 
Kafka topic with @extref[Pekko-Connectors Kafka](pekko-connectors-kafka:).
 
 Browse the sources at 
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-mqtt-to-kafka)
 { open=new }.
 
diff --git a/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala 
b/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
index 746cd98..f25ba6c 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
+++ b/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
@@ -5,7 +5,7 @@ object Dependencies {
   // #deps
   val PekkoVersion = "2.6.19"
   val PekkoConnectorsVersion = "4.0.0"
-  val AlpakkaKafkaVersion = "3.0.1"
+  val PekkoConnectorsKafkaVersion = "3.0.1"
   val JacksonDatabindVersion = "2.11.4"
 
   // #deps
@@ -13,7 +13,7 @@ object Dependencies {
   val dependencies = List(
   // #deps
     "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % 
PekkoConnectorsVersion,
-    "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+    "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
     "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
     "com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
     "com.typesafe.akka" %% "akka-actor" % PekkoVersion,
diff --git a/pekko-connectors-sample-rotate-logs-to-ftp/README.md 
b/pekko-connectors-sample-rotate-logs-to-ftp/README.md
index 573052d..5662c01 100644
--- a/pekko-connectors-sample-rotate-logs-to-ftp/README.md
+++ b/pekko-connectors-sample-rotate-logs-to-ftp/README.md
@@ -2,7 +2,7 @@
 
 ## Rotate data stream over to multiple compressed files on SFTP server
 
-This example reads a stream of data and uses @extref[Alpakka 
File](alpakka:file.html) `LogRotatorSink` to write multiple files which get 
rotated triggered by a rotation function, the files are zipped in-flow and 
written to an SFTP server with @extref[Alpakka FTP](alpakka:ftp.html).
+This example reads a stream of data and uses @extref[Pekko-Connectors 
File](alpakka:file.html) `LogRotatorSink` to write multiple files which get 
rotated triggered by a rotation function, the files are zipped in-flow and 
written to an SFTP server with @extref[Pekko-Connectors FTP](alpakka:ftp.html).
 
 Browse the sources at 
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-rotate-logs-to-ftp)
 { open=new }.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to