This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git
The following commit(s) were added to refs/heads/main by this push:
new adec5c6 [kafka-to-elasticsearch] Migrate akka dependencies to pekko
(#18)
adec5c6 is described below
commit adec5c6e7cae82dccd513378ea4c3fde30573a79
Author: Laglangyue <[email protected]>
AuthorDate: Sun Dec 17 17:01:19 2023 +0800
[kafka-to-elasticsearch] Migrate akka dependencies to pekko (#18)
* [kafka-to-elasticsearch] Migrate akka dependencies to pekko
* [kafka-to-elasticsearch] Migrate akka dependencies to pekko
* use confluentinc/cp-kafka:5.4.1
* try to fix doc build
---------
Co-authored-by: laglang <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
---
docs/build.sbt | 40 ++--
docs/project/Dependencies.scala | 8 +-
docs/src/main/paradox/index.md | 2 +-
.../project/Dependencies.scala | 4 +-
.../step_003_parse_csv/README.md | 2 +-
.../step_007_produce_to_kafka/README.md | 2 +-
.../README.md | 2 +-
.../build.sbt | 2 +-
.../project/Dependencies.scala | 31 ++-
.../src/main/java/samples/javadsl/Helper.java | 140 +++++++-------
.../src/main/java/samples/javadsl/JsonMappers.java | 9 +-
.../src/main/java/samples/javadsl/Main.java | 215 +++++++++++----------
.../src/main/java/samples/javadsl/Movie.java | 27 ++-
.../src/main/resources/application.conf | 6 +-
.../src/main/resources/logback.xml | 2 +-
.../src/main/scala/samples/scaladsl/Helper.scala | 35 ++--
.../src/main/scala/samples/scaladsl/Main.scala | 24 +--
.../README.md | 2 +-
.../docs/src/main/paradox/example.md | 2 +-
.../docs/src/main/paradox/full-source.md | 2 +-
.../project/Dependencies.scala | 4 +-
pekko-connectors-sample-mqtt-to-kafka/README.md | 2 +-
.../project/Dependencies.scala | 4 +-
.../README.md | 2 +-
24 files changed, 283 insertions(+), 286 deletions(-)
diff --git a/docs/build.sbt b/docs/build.sbt
index 9d9ad40..55111b7 100644
--- a/docs/build.sbt
+++ b/docs/build.sbt
@@ -28,11 +28,11 @@ FtpToFile / paradoxProperties ++= Map(
// Pekko Connectors
"scaladoc.org.apache.pekko.stream.connectors.base_url" ->
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}",
"javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
- "extref.org.apache.pekko.stream.connectors.base_url" ->
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}/%s",
+ "extref.pekko-connectors.base_url" ->
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FtpToFile.PekkoConnectorsVersion}/%s",
// Pekko
"scaladoc.org.apache.pekko.base_url" ->
s"https://pekko.apache.org/api/pekko/${Dependencies.FtpToFile.PekkoVersion}",
"javadoc.org.apache.pekko.base_url" ->
s"https://pekko.apache.org/japi/pekko/${Dependencies.FtpToFile.PekkoVersion}",
- "extref.org.apache.pekko.base_url" ->
s"https://pekko.apache.org/docs/pekko/${Dependencies.FtpToFile.PekkoVersion}/%s",
+ "extref.pekko.base_url" ->
s"https://pekko.apache.org/docs/pekko/${Dependencies.FtpToFile.PekkoVersion}/%s",
)
FtpToFile / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))
@@ -52,9 +52,9 @@ HttpCsvToKafka / paradoxProperties ++= Map(
"javadoc.akka.base_url" -> "",
"extref.alpakka.base_url" ->
s"https://doc.akka.io/docs/alpakka/${Dependencies.HttpCsvToKafka.PekkoConnectorsVersion}/%s",
// Pekko Connectors Kafka
- "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.HttpCsvToKafka.AlpakkaKafkaVersion}",
+ "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}",
"javadoc.akka.kafka.base_url" -> "",
- "extref.alpakka-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.HttpCsvToKafka.AlpakkaKafkaVersion}/%s",
+ "extref.pekko-connectors-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}/%s",
// Pekko
"scaladoc.akka.base_url" ->
s"https://doc.akka.io/api/akka/${Dependencies.HttpCsvToKafka.PekkoVersion}",
"javadoc.akka.base_url" ->
s"https://doc.akka.io/japi/akka/${Dependencies.HttpCsvToKafka.PekkoVersion}",
@@ -126,17 +126,17 @@ KafkaToElasticsearch / paradoxProperties ++= Map(
"snip.build.base_dir" ->
s"${baseDirectory.value}/../pekko-connectors-sample-${KafkaToElasticsearch.name}",
"github.root.base_dir" -> s"${baseDirectory.value}/..",
// Pekko Connectors
- "scaladoc.akka.stream.alpakka.base_url" ->
s"https://doc.akka.io/api/alpakka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}",
- "javadoc.akka.base_url" -> "",
- "extref.alpakka.base_url" ->
s"https://doc.akka.io/docs/alpakka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}/%s",
+ "scaladoc.org.apache.pekko.stream.connectors.base_url" ->
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}",
+ "javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
+ "extref.pekko-connectors.base_url" ->
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.KafkaToElasticsearch.PekkoConnectorsVersion}/%s",
// Pekko Connectors Kafka
- "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToElasticsearch.AlpakkaKafkaVersion}",
- "javadoc.akka.kafka.base_url" -> "",
- "extref.alpakka-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToElasticsearch.AlpakkaKafkaVersion}/%s",
+ "scaladoc.org.apache.pekko.kafka.base_url" ->
s"https://pekko.apache.org/api/alpakka-kafka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsKafkaVersion}",
+ "javadoc.org.apache.pekko.kafka.base_url" -> "",
+ "extref.pekko-connectors-kafka.base_url" ->
s"https://pekko.apache.org/docs/alpakka-kafka/${Dependencies.KafkaToElasticsearch.PekkoConnectorsKafkaVersion}/%s",
// Pekko
- "scaladoc.akka.base_url" ->
s"https://doc.akka.io/api/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}",
- "javadoc.akka.base_url" ->
s"https://doc.akka.io/japi/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}",
- "extref.akka.base_url" ->
s"https://doc.akka.io/docs/akka/${Dependencies.KafkaToElasticsearch.PekkoVersion}/%s",
+ "scaladoc.org.apache.pekko.base_url" ->
s"https://pekko.apache.org/api/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}",
+ "javadoc.org.apache.pekko.base_url" ->
s"https://pekko.apache.org/japi/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}",
+ "extref.pekko.base_url" ->
s"https://pekko.apache.org/docs/pekko/${Dependencies.KafkaToElasticsearch.PekkoVersion}/%s",
)
KafkaToElasticsearch / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))
@@ -156,9 +156,9 @@ KafkaToWebsocketClients / paradoxProperties ++= Map(
// "javadoc.akka.base_url" -> "",
// "extref.alpakka.base_url" ->
s"https://doc.akka.io/docs/alpakka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsVersion}/%s",
// Pekko Connectors Kafka
- "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.AlpakkaKafkaVersion}",
+ "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsKafkaVersion}",
"javadoc.akka.kafka.base_url" -> "",
- "extref.alpakka-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.AlpakkaKafkaVersion}/%s",
+ "extref.pekko-connectors-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.KafkaToWebsocketClients.PekkoConnectorsKafkaVersion}/%s",
// Pekko
"scaladoc.akka.base_url" ->
s"https://doc.akka.io/api/akka/${Dependencies.KafkaToWebsocketClients.PekkoVersion}",
"javadoc.akka.base_url" ->
s"https://doc.akka.io/japi/akka/${Dependencies.KafkaToWebsocketClients.PekkoVersion}",
@@ -186,9 +186,9 @@ MqttToKafka / paradoxProperties ++= Map(
"javadoc.akka.base_url" -> "",
"extref.alpakka.base_url" ->
s"https://doc.akka.io/docs/alpakka/${Dependencies.MqttToKafka.PekkoConnectorsVersion}/%s",
// Pekko Connectors Kafka
- "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.MqttToKafka.AlpakkaKafkaVersion}",
+ "scaladoc.akka.kafka.base_url" ->
s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.MqttToKafka.PekkoConnectorsKafkaVersion}",
"javadoc.akka.kafka.base_url" -> "",
- "extref.alpakka-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.MqttToKafka.AlpakkaKafkaVersion}/%s",
+ "extref.pekko-connectors-kafka.base_url" ->
s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.MqttToKafka.PekkoConnectorsKafkaVersion}/%s",
// Pekko
"scaladoc.akka.base_url" ->
s"https://doc.akka.io/api/akka/${Dependencies.MqttToKafka.PekkoVersion}",
"javadoc.akka.base_url" ->
s"https://doc.akka.io/japi/akka/${Dependencies.MqttToKafka.PekkoVersion}",
@@ -211,11 +211,11 @@ FileToElasticsearch / paradoxProperties ++= Map(
// Pekko Connectors
"scaladoc.org.apache.pekko.stream.connectors.base_url" ->
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}",
"javadoc.org.apache.pekko.stream.connectors.base_url" -> "",
- "extref.org.apache.pekko.stream.connectors.base_url" ->
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}/%s",
+ "extref.pekko-connectors.base_url" ->
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.FileToElasticsearch.PekkoConnectorsVersion}/%s",
// Pekko
"scaladoc.org.apache.pekko.base_url" ->
s"https://pekko.apache.org/api/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}",
"javadoc.org.apache.pekko.base_url" ->
s"https://pekko.apache.org/japi/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}",
- "extref.org.apache.pekko.base_url" ->
s"https://pekko.apache.org/docs/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}/%s",
+ "extref.pekko.base_url" ->
s"https://pekko.apache.org/docs/pekko/${Dependencies.FileToElasticsearch.PekkoVersion}/%s",
)
FileToElasticsearch / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))
@@ -246,7 +246,7 @@ Paradox / siteSubdirName := ""
paradoxProperties ++= Map(
"extref.akka.base_url" -> "https://doc.akka.io/docs/akka/current/",
"extref.alpakka.base_url" -> "https://doc.akka.io/docs/alpakka/current/",
- "extref.alpakka-kafka.base_url" ->
"https://doc.akka.io/docs/alpakka-kafka/current/",
+ "extref.pekko-connectors-kafka.base_url" ->
"https://doc.akka.io/docs/alpakka-kafka/current/",
"extref.ftp-to-file.base_url" -> s"${(FtpToFile / siteSubdirName).value}/",
"extref.http-csv-to-kafka.base_url" -> s"${(HttpCsvToKafka /
siteSubdirName).value}/",
"extref.jdbc-to-elasticsearch.base_url" -> s"${(JdbcToElasticsearch /
siteSubdirName).value}/",
diff --git a/docs/project/Dependencies.scala b/docs/project/Dependencies.scala
index 2fcd09b..af5360b 100644
--- a/docs/project/Dependencies.scala
+++ b/docs/project/Dependencies.scala
@@ -31,7 +31,7 @@ object Dependencies {
val PekkoVersion = versions("PekkoVersion")
val AkkaHttpVersion = versions("AkkaHttpVersion")
val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
- val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
+ val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
}
object JdbcToElasticsearch {
@@ -75,7 +75,7 @@ object Dependencies {
val ScalaVersion = versions("scalaVer")
val PekkoVersion = versions("PekkoVersion")
val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
- val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
+ val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
}
object KafkaToWebsocketClients {
@@ -90,7 +90,7 @@ object Dependencies {
val ScalaVersion = versions("scalaVer")
val PekkoVersion = versions("PekkoVersion")
val AkkaHttpVersion = versions("AkkaHttpVersion")
- val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
+ val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
}
object MqttToKafka {
@@ -105,7 +105,7 @@ object Dependencies {
val ScalaVersion = versions("scalaVer")
val PekkoVersion = versions("PekkoVersion")
val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
- val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion")
+ val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion")
}
object FileToElasticsearch {
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index c410318..e4a3d4c 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -25,7 +25,7 @@ This example uses Pekko Connectors File to watch for new
files created in a dire
Read a stream of data and store it zipped in rotating files on an SFTP server.
### @extref:[Subscribe to MQTT and produce to Kafka](mqtt-to-kafka:index.html)
-Subscribe to an MQTT topic with @extref[Pekko Connectors
MQTT](pekko-connectors:/mqtt.html), group a few values and publish the
aggregate to a Kafka topic with @extref[Pekko Connectors Kafka](alpakka-kafka:).
+Subscribe to an MQTT topic with @extref[Pekko Connectors
MQTT](pekko-connectors:/mqtt.html), group a few values and publish the
aggregate to a Kafka topic with @extref[Pekko Connectors
Kafka](pekko-connectors-kafka:).
### @link:[Amazon
SQS](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-sqs-java)
{ open=new }
Listen to an Amazon SQS topic, enrich the message via calling an actor,
publish a new message to SQS and acknowledge/delete the original message. (Java
only)
diff --git
a/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala
b/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala
index cc5be29..5277491 100644
--- a/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala
+++ b/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala
@@ -7,11 +7,11 @@ object Dependencies {
val PekkoVersion = "2.6.19"
val AkkaHttpVersion = "10.1.12"
val PekkoConnectorsVersion = "4.0.0"
- val AlpakkaKafkaVersion = "3.0.1"
+ val PekkoConnectorsKafkaVersion = "3.0.1"
val dependencies = List(
"com.lightbend.akka" %% "akka-stream-alpakka-csv" % PekkoConnectorsVersion,
- "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+ "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
"com.typesafe.akka" %% "akka-stream" % PekkoVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
diff --git
a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md
b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md
index ca8e0b6..8f85b07 100644
--- a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md
+++ b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md
@@ -2,4 +2,4 @@
### Description
-The binary data in @scaladoc:[ByteString](akka.util.ByteString)s is passed
into @extref:[Alpakka CSV](alpakka:data-transformations/csv.html) to be parsed
and converted per line into a Map. The stream elements becomes a
@scala[`Map[String, ByteString]`]@java[`Map<String, ByteString>`], one entry
per column using the column headers as keys.
+The binary data in @scaladoc:[ByteString](akka.util.ByteString)s is passed
into @extref:[Pekko-Connectors CSV](alpakka:data-transformations/csv.html) to
be parsed and converted per line into a Map. The stream elements becomes a
@scala[`Map[String, ByteString]`]@java[`Map<String, ByteString>`], one entry
per column using the column headers as keys.
diff --git
a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md
b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md
index e6b6510..d220156 100644
---
a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md
+++
b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md
@@ -4,6 +4,6 @@
[Testcontainers](https://www.testcontainers.org/) starts a Kafka broker in
Docker.
-@extref:[Alpakka Kafka](alpakka-kafka:producer.html) producer settings specify
the broker address and the data types for Kafka's key and value.
+@extref:[Pekko-Connectors Kafka](pekko-connectors-kafka:producer.html)
producer settings specify the broker address and the data types for Kafka's key
and value.
@scaladoc:[Producer.plainSink](akka.kafka.scaladsl.Producer$) sends the
`ProducerRecord`s stream elements to the specified Kafka topic.
diff --git a/pekko-connectors-sample-kafka-to-elasticsearch/README.md
b/pekko-connectors-sample-kafka-to-elasticsearch/README.md
index 865e3ff..bdab891 100644
--- a/pekko-connectors-sample-kafka-to-elasticsearch/README.md
+++ b/pekko-connectors-sample-kafka-to-elasticsearch/README.md
@@ -2,7 +2,7 @@
## Read from a Kafka topic and publish to Elasticsearch
-This example uses @extref[Alpakka Kafka](alpakka-kafka:) to subscribe to a
Kafka topic, parses JSON into a data class and stores the object in
Elasticsearch. After storing the Kafka offset is committed back to Kafka. This
gives at-least-once semantics.
+This example uses @extref[Pekko-Connectors Kafka](pekko-connectors-kafka:) to
subscribe to a Kafka topic, parses JSON into a data class and stores the object
in Elasticsearch. After storing the Kafka offset is committed back to Kafka.
This gives at-least-once semantics.
Browse the sources at
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-kafka-to-elasticsearch)
{ open=new }.
diff --git a/pekko-connectors-sample-kafka-to-elasticsearch/build.sbt
b/pekko-connectors-sample-kafka-to-elasticsearch/build.sbt
index d5eb377..49f241e 100644
--- a/pekko-connectors-sample-kafka-to-elasticsearch/build.sbt
+++ b/pekko-connectors-sample-kafka-to-elasticsearch/build.sbt
@@ -1,4 +1,4 @@
-organization := "com.lightbend.akka"
+organization := "org.apache.pekko"
version := "1.0.0"
scalaVersion := Dependencies.scalaVer
libraryDependencies ++= Dependencies.dependencies
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/project/Dependencies.scala
b/pekko-connectors-sample-kafka-to-elasticsearch/project/Dependencies.scala
index 09d310f..e73343f 100644
--- a/pekko-connectors-sample-kafka-to-elasticsearch/project/Dependencies.scala
+++ b/pekko-connectors-sample-kafka-to-elasticsearch/project/Dependencies.scala
@@ -3,29 +3,28 @@ import sbt._
object Dependencies {
val scalaVer = "2.13.12"
// #deps
- val PekkoVersion = "2.6.19"
- val PekkoConnectorsVersion = "4.0.0"
- val AlpakkaKafkaVersion = "3.0.1"
+ val PekkoVersion = "1.0.2"
+ val PekkoConnectorsVersion = "1.0.1"
+ val PekkoConnectorsKafkaVersion = "1.0.0"
// #deps
val dependencies = List(
- // #deps
- "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" %
PekkoConnectorsVersion,
- "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
- "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
- "com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
- "com.typesafe.akka" %% "akka-actor" % PekkoVersion,
+ // #deps
+ "org.apache.pekko" %% "pekko-connectors-elasticsearch" %
PekkoConnectorsVersion,
+ "org.apache.pekko" %% "pekko-connectors-kafka" %
PekkoConnectorsKafkaVersion,
+ "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-actor" % PekkoVersion,
// for JSON in Scala
"io.spray" %% "spray-json" % "1.3.6",
// for JSON in Java
- "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.3",
- "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.13.3",
+ "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.14.3",
+ "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.14.3",
// Logging
- "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
"ch.qos.logback" % "logback-classic" % "1.2.13",
- // #deps
- "org.testcontainers" % "elasticsearch" % "1.17.3",
- "org.testcontainers" % "kafka" % "1.17.3"
- )
+ // #deps
+ "org.testcontainers" % "elasticsearch" % "1.17.6",
+ "org.testcontainers" % "kafka" % "1.17.6")
}
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Helper.java
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Helper.java
index 16716f8..d444337 100644
---
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Helper.java
+++
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Helper.java
@@ -1,14 +1,15 @@
package samples.javadsl;
-import akka.Done;
-import akka.actor.typed.ActorSystem;
-import static akka.actor.typed.javadsl.Adapter.*;
-import akka.kafka.ProducerSettings;
-import akka.kafka.javadsl.Producer;
-import akka.stream.alpakka.elasticsearch.*;
-import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
+import static org.apache.pekko.actor.typed.javadsl.Adapter.*;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.kafka.ProducerSettings;
+import org.apache.pekko.kafka.javadsl.Producer;
+import org.apache.pekko.stream.connectors.elasticsearch.*;
+import
org.apache.pekko.stream.connectors.elasticsearch.javadsl.ElasticsearchSource;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -16,72 +17,73 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
import java.util.List;
import java.util.concurrent.CompletionStage;
public class Helper {
- private static final Logger log = LoggerFactory.getLogger(Helper.class);
-
- private ElasticsearchContainer elasticsearchContainer;
- public ElasticsearchConnectionSettings connectionSettings;
-
- private KafkaContainer kafka;
- public String kafkaBootstrapServers;
-
- public Helper() {
- }
-
- public void startContainers() {
- elasticsearchContainer =
- new
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2");
- elasticsearchContainer.start();
- connectionSettings = ElasticsearchConnectionSettings.create("http://"
+ elasticsearchContainer.getHttpHostAddress());
-
- kafka = new KafkaContainer("5.4.1"); // contains Kafka 2.4.x
- kafka.start();
- kafkaBootstrapServers = kafka.getBootstrapServers();
- }
-
- public void stopContainers() {
- kafka.stop();
- elasticsearchContainer.stop();
- }
-
- CompletionStage<Done> writeToKafka(String topic, List<Movie> movies,
ActorSystem<?> actorSystem) {
- ProducerSettings<Integer, String> kafkaProducerSettings =
- ProducerSettings.create(toClassic(actorSystem), new
IntegerSerializer(), new StringSerializer())
- .withBootstrapServers(kafkaBootstrapServers);
-
- CompletionStage<Done> producing =
- Source.from(movies)
- .map(
- movie -> {
- log.debug("producing {}", movie);
- String json =
JsonMappers.movieWriter.writeValueAsString(movie);
- return new ProducerRecord<>(topic,
movie.id, json);
- })
- .runWith(Producer.plainSink(kafkaProducerSettings),
actorSystem);
- producing.thenAccept(s -> log.info("Producing finished"));
- return producing;
- }
-
- CompletionStage<List<Movie>> readFromElasticsearch(String indexName,
ActorSystem<?> actorSystem) {
- CompletionStage<List<Movie>> reading =
- ElasticsearchSource.typed(
- ElasticsearchParams.V7(indexName),
- "{\"match_all\": {}}",
- ElasticsearchSourceSettings.create(connectionSettings),
- Movie.class)
- .map(readResult -> readResult.source())
- .runWith(Sink.seq(), actorSystem);
- reading.thenAccept(
- non -> {
- log.info("Reading finished");
- });
- return reading;
- }
+ private static final Logger log = LoggerFactory.getLogger(Helper.class);
+
+ private ElasticsearchContainer elasticsearchContainer;
+ public ElasticsearchConnectionSettings connectionSettings;
+
+ private KafkaContainer kafka;
+ public String kafkaBootstrapServers;
+
+ public Helper() {
+ }
+
+ public void startContainers() {
+ elasticsearchContainer =
+ new
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2");
+ elasticsearchContainer.start();
+ connectionSettings = ElasticsearchConnectionSettings.create("http://" +
elasticsearchContainer.getHttpHostAddress());
+
+ kafka = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.1")); //
contains Kafka 2.4.x
+ kafka.start();
+ kafkaBootstrapServers = kafka.getBootstrapServers();
+ }
+
+ public void stopContainers() {
+ kafka.stop();
+ elasticsearchContainer.stop();
+ }
+
+ CompletionStage<Done> writeToKafka(String topic, List<Movie> movies,
ActorSystem<?> actorSystem) {
+ ProducerSettings<Integer, String> kafkaProducerSettings =
+ ProducerSettings.create(toClassic(actorSystem), new
IntegerSerializer(), new StringSerializer())
+ .withBootstrapServers(kafkaBootstrapServers);
+
+ CompletionStage<Done> producing =
+ Source.from(movies)
+ .map(
+ movie -> {
+ log.debug("producing {}", movie);
+ String json =
JsonMappers.movieWriter.writeValueAsString(movie);
+ return new ProducerRecord<>(topic, movie.id, json);
+ })
+ .runWith(Producer.plainSink(kafkaProducerSettings), actorSystem);
+ producing.thenAccept(s -> log.info("Producing finished"));
+ return producing;
+ }
+
+ CompletionStage<List<Movie>> readFromElasticsearch(String indexName,
ActorSystem<?> actorSystem) {
+ CompletionStage<List<Movie>> reading =
+ ElasticsearchSource.typed(
+ ElasticsearchParams.V7(indexName),
+ "{\"match_all\": {}}",
+ ElasticsearchSourceSettings.create(connectionSettings),
+ Movie.class)
+ .map(readResult -> readResult.source())
+ .runWith(Sink.seq(), actorSystem);
+ reading.thenAccept(
+ non -> {
+ log.info("Reading finished");
+ });
+ return reading;
+ }
}
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/JsonMappers.java
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/JsonMappers.java
index a3d3767..9ef310b 100644
---
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/JsonMappers.java
+++
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/JsonMappers.java
@@ -6,8 +6,9 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
class JsonMappers {
- // Jackson conversion setup (3)
- public final static ObjectMapper mapper = new
ObjectMapper().registerModule(new JavaTimeModule());
- public final static ObjectWriter movieWriter =
mapper.writerFor(Movie.class);
- public final static ObjectReader movieReader =
mapper.readerFor(Movie.class);
+
+ // Jackson's conversion setup (3)
+ public final static ObjectMapper mapper = new
ObjectMapper().registerModule(new JavaTimeModule());
+ public final static ObjectWriter movieWriter = mapper.writerFor(Movie.class);
+ public final static ObjectReader movieReader = mapper.readerFor(Movie.class);
}
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Main.java
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Main.java
index 71d43f4..d397885 100644
---
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Main.java
+++
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Main.java
@@ -6,19 +6,20 @@ package samples.javadsl;
// #imports
-import akka.Done;
-import akka.NotUsed;
-import akka.actor.typed.ActorSystem;
-import static akka.actor.typed.javadsl.Adapter.*;
-import akka.actor.typed.javadsl.Behaviors;
-import akka.kafka.CommitterSettings;
-import akka.kafka.ConsumerSettings;
-import akka.kafka.Subscriptions;
-import akka.kafka.javadsl.Committer;
-import akka.kafka.javadsl.Consumer;
-import akka.stream.alpakka.elasticsearch.*;
-import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow;
-import akka.stream.javadsl.Keep;
+import static org.apache.pekko.actor.typed.javadsl.Adapter.*;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.kafka.CommitterSettings;
+import org.apache.pekko.kafka.ConsumerSettings;
+import org.apache.pekko.kafka.Subscriptions;
+import org.apache.pekko.kafka.javadsl.Committer;
+import org.apache.pekko.kafka.javadsl.Consumer;
+import org.apache.pekko.stream.connectors.elasticsearch.*;
+import
org.apache.pekko.stream.connectors.elasticsearch.javadsl.ElasticsearchFlow;
+import org.apache.pekko.stream.javadsl.Keep;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -34,99 +35,99 @@ import java.util.concurrent.TimeUnit;
public class Main {
- private static final Logger log = LoggerFactory.getLogger(Main.class);
-
- private final Helper helper;
- private final ElasticsearchConnectionSettings elasticsearchAddress;
- private final String kafkaBootstrapServers;
-
- private final String topic = "movies-to-elasticsearch";
- private final String groupId = "docs-group";
-
- // #es-setup
- private final String indexName = "movies";
-
- // #es-setup
-
- public Main(Helper helper) {
- this.elasticsearchAddress = helper.connectionSettings;
- this.kafkaBootstrapServers = helper.kafkaBootstrapServers;
- this.helper = helper;
- }
-
- private ActorSystem<Void> actorSystem;
-
- private Consumer.DrainingControl<Done> readFromKafkaToEleasticsearch() {
- // #kafka-setup
- // configure Kafka consumer (1)
- ConsumerSettings<Integer, String> kafkaConsumerSettings =
- ConsumerSettings.create(actorSystem, new
IntegerDeserializer(), new StringDeserializer())
- .withBootstrapServers(kafkaBootstrapServers)
- .withGroupId(groupId)
- .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
- .withStopTimeout(Duration.ofSeconds(5));
- // #kafka-setup
-
- // #flow
- Consumer.DrainingControl<Done> control =
- Consumer.sourceWithOffsetContext(kafkaConsumerSettings,
Subscriptions.topics(topic)) // (5)
- .map(
- consumerRecord -> { // (6)
- Movie movie =
JsonMappers.movieReader.readValue(consumerRecord.value());
- return
WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
- })
- .via(
- ElasticsearchFlow.createWithContext(
- ElasticsearchParams.V7(indexName),
-
ElasticsearchWriteSettings.create(elasticsearchAddress),
- JsonMappers.mapper)) // (7)
- .map(
- writeResult -> { // (8)
- writeResult
- .getError()
- .ifPresent(
- errorJson -> {
- throw new
RuntimeException(
- "Elasticsearch
update failed "
- +
writeResult.getErrorReason().orElse(errorJson));
- });
- return NotUsed.notUsed();
- })
-
.toMat(Committer.sinkWithOffsetContext(CommitterSettings.create(toClassic(actorSystem))),
Keep.both()) // (9)
- .mapMaterializedValue(Consumer::createDrainingControl)
// (10)
- .run(actorSystem);
- // #flow
- return control;
- }
-
- private CompletionStage<Done> run() throws Exception {
- actorSystem = ActorSystem.create(Behaviors.empty(),
"KafkaToElasticSearch");
-
- List<Movie> movies = Arrays.asList(new Movie(23, "Psycho"), new
Movie(423, "Citizen Kane"));
- CompletionStage<Done> writing = helper.writeToKafka(topic, movies,
actorSystem);
- writing.toCompletableFuture().get(10, TimeUnit.SECONDS);
-
- Consumer.DrainingControl<Done> control =
readFromKafkaToEleasticsearch();
- TimeUnit.SECONDS.sleep(5);
- CompletionStage<Done> copyingFinished =
control.drainAndShutdown(actorSystem.executionContext());
- copyingFinished.toCompletableFuture().get(10, TimeUnit.SECONDS);
- CompletionStage<List<Movie>> reading =
helper.readFromElasticsearch(indexName, actorSystem);
-
- return reading.thenCompose(
- ms -> {
- ms.forEach(m -> System.out.println("read " + m));
- actorSystem.terminate();
- return actorSystem.getWhenTerminated();
- });
- }
-
- public static void main(String[] args) throws Exception {
- Helper helper = new Helper();
- helper.startContainers();
- Main main = new Main(helper);
- CompletionStage<Done> run = main.run();
- run.thenAccept(res -> {
- helper.stopContainers();
+ private static final Logger log = LoggerFactory.getLogger(Main.class);
+
+ private final Helper helper;
+ private final ElasticsearchConnectionSettings elasticsearchAddress;
+ private final String kafkaBootstrapServers;
+
+ private final String topic = "movies-to-elasticsearch";
+ private final String groupId = "docs-group";
+
+ // #es-setup
+ private final String indexName = "movies";
+
+ // #es-setup
+
+ public Main(Helper helper) {
+ this.elasticsearchAddress = helper.connectionSettings;
+ this.kafkaBootstrapServers = helper.kafkaBootstrapServers;
+ this.helper = helper;
+ }
+
+ private ActorSystem<Void> actorSystem;
+
+ private Consumer.DrainingControl<Done> readFromKafkaToEleasticsearch() {
+ // #kafka-setup
+ // configure Kafka consumer (1)
+ ConsumerSettings<Integer, String> kafkaConsumerSettings =
+ ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new
StringDeserializer())
+ .withBootstrapServers(kafkaBootstrapServers)
+ .withGroupId(groupId)
+ .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ .withStopTimeout(Duration.ofSeconds(5));
+ // #kafka-setup
+
+ // #flow
+ Consumer.DrainingControl<Done> control =
+ Consumer.sourceWithOffsetContext(kafkaConsumerSettings,
Subscriptions.topics(topic)) // (5)
+ .map(
+ consumerRecord -> { // (6)
+ Movie movie =
JsonMappers.movieReader.readValue(consumerRecord.value());
+ return
WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
+ })
+ .via(
+ ElasticsearchFlow.createWithContext(
+ ElasticsearchParams.V7(indexName),
+ ElasticsearchWriteSettings.create(elasticsearchAddress),
+ JsonMappers.mapper)) // (7)
+ .map(
+ writeResult -> { // (8)
+ writeResult
+ .getError()
+ .ifPresent(
+ errorJson -> {
+ throw new RuntimeException(
+ "Elasticsearch update failed "
+ +
writeResult.getErrorReason().orElse(errorJson));
+ });
+ return NotUsed.notUsed();
+ })
+
.toMat(Committer.sinkWithOffsetContext(CommitterSettings.create(toClassic(actorSystem))),
Keep.both()) // (9)
+ .mapMaterializedValue(Consumer::createDrainingControl) // (10)
+ .run(actorSystem);
+ // #flow
+ return control;
+ }
+
+ private CompletionStage<Done> run() throws Exception {
+ actorSystem = ActorSystem.create(Behaviors.empty(),
"KafkaToElasticSearch");
+
+ List<Movie> movies = Arrays.asList(new Movie(23, "Psycho"), new Movie(423,
"Citizen Kane"));
+ CompletionStage<Done> writing = helper.writeToKafka(topic, movies,
actorSystem);
+ writing.toCompletableFuture().get(10, TimeUnit.SECONDS);
+
+ Consumer.DrainingControl<Done> control = readFromKafkaToEleasticsearch();
+ TimeUnit.SECONDS.sleep(5);
+ CompletionStage<Done> copyingFinished =
control.drainAndShutdown(actorSystem.executionContext());
+ copyingFinished.toCompletableFuture().get(10, TimeUnit.SECONDS);
+ CompletionStage<List<Movie>> reading =
helper.readFromElasticsearch(indexName, actorSystem);
+
+ return reading.thenCompose(
+ ms -> {
+ ms.forEach(m -> System.out.println("read " + m));
+ actorSystem.terminate();
+ return actorSystem.getWhenTerminated();
});
- }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Helper helper = new Helper();
+ helper.startContainers();
+ Main main = new Main(helper);
+ CompletionStage<Done> run = main.run();
+ run.thenAccept(res -> {
+ helper.stopContainers();
+ });
+ }
}
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Movie.java
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Movie.java
index 5fb955c..09d47d8 100644
---
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Movie.java
+++
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/java/samples/javadsl/Movie.java
@@ -2,25 +2,22 @@ package samples.javadsl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
// Type in Elasticsearch (2)
public class Movie {
- public final int id;
- public final String title;
- @JsonCreator
- public Movie(@JsonProperty("id") int id, @JsonProperty("title") String
title) {
- this.id = id;
- this.title = title;
- }
+ public final int id;
+ public final String title;
- @Override
- public String toString() {
- return "Movie(" + id + ", title=" + title + ")";
- }
+ @JsonCreator
+ public Movie(@JsonProperty("id") int id, @JsonProperty("title") String
title) {
+ this.id = id;
+ this.title = title;
+ }
+
+ @Override
+ public String toString() {
+ return "Movie(" + id + ", title=" + title + ")";
+ }
}
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/application.conf
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/application.conf
index 7bdde8d..77117b1 100644
---
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/application.conf
+++
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/application.conf
@@ -1,5 +1,5 @@
-akka {
- loggers = ["akka.event.slf4j.Slf4jLogger"]
- logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
loglevel = "DEBUG"
}
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/logback.xml
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/logback.xml
index a57a169..9baedff 100644
---
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/logback.xml
+++
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/resources/logback.xml
@@ -9,7 +9,7 @@
<logger name="org.apache" level="WARN"/>
<logger name="kafka" level="WARN"/>
- <logger name="akka" level="WARN"/>
+ <logger name="pekko" level="WARN"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>
<logger name="org.I0Itec.zkclient" level="WARN"/>
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Helper.scala
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Helper.scala
index 5d34720..325488e 100644
---
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Helper.scala
+++
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Helper.scala
@@ -4,43 +4,39 @@
package samples.scaladsl
-import akka.actor.typed.ActorSystem
-import akka.kafka._
-import akka.kafka.scaladsl.Producer
-import akka.actor.typed.scaladsl.adapter._
-import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSource
-import akka.stream.scaladsl.{Sink, Source}
-import akka.Done
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization._
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.adapter._
+import org.apache.pekko.kafka._
+import org.apache.pekko.kafka.scaladsl.Producer
+import
org.apache.pekko.stream.connectors.elasticsearch.scaladsl.ElasticsearchSource
+import org.apache.pekko.stream.scaladsl.{ Sink, Source }
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.elasticsearch._
import org.slf4j.LoggerFactory
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.elasticsearch.ElasticsearchContainer
-import spray.json.DefaultJsonProtocol._
+import org.testcontainers.utility.DockerImageName
+import samples.scaladsl.JsonFormats._
import spray.json._
import scala.collection.immutable
-import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future}
-import JsonFormats._
-import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings
-import akka.stream.alpakka.elasticsearch.ElasticsearchParams
-import akka.stream.alpakka.elasticsearch.ElasticsearchSourceSettings
-import samples.scaladsl.Main.{elasticsearchContainer, kafka}
+import scala.concurrent.Future
trait Helper {
final val log = LoggerFactory.getLogger(getClass)
- // Testcontainers: start Elasticsearch in Docker
+ // TestContainers: start Elasticsearch in Docker
val elasticsearchContainer = new
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2")
elasticsearchContainer.start()
val connectionSettings = ElasticsearchConnectionSettings("http://" +
elasticsearchContainer.getHttpHostAddress)
- // Testcontainers: start Kafka in Docker
+ // TestContainers: start Kafka in Docker
// [[https://hub.docker.com/r/confluentinc/cp-kafka/tags Available Docker
images]]
//
[[https://docs.confluent.io/current/installation/versions-interoperability.html
Kafka versions in Confluent Platform]]
- val kafka = new KafkaContainer("5.4.1") // contains Kafka 2.4.x
+ val kafka = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.1")) //
contains Kafka 2.4.x
kafka.start()
val kafkaBootstrapServers = kafka.getBootstrapServers
@@ -60,7 +56,8 @@ trait Helper {
def readFromElasticsearch(indexName: String)(implicit actorSystem:
ActorSystem[_]): Future[immutable.Seq[Movie]] = {
val reading = ElasticsearchSource
- .typed[Movie](ElasticsearchParams.V7(indexName), """{"match_all":
{}}""", ElasticsearchSourceSettings(connectionSettings))
+ .typed[Movie](ElasticsearchParams.V7(indexName), """{"match_all": {}}""",
+ ElasticsearchSourceSettings(connectionSettings))
.map(_.source)
.runWith(Sink.seq)
reading.foreach(_ => log.info("Reading
finished"))(actorSystem.executionContext)
diff --git
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Main.scala
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Main.scala
index 9a10e95..f552ce1 100644
---
a/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Main.scala
+++
b/pekko-connectors-sample-kafka-to-elasticsearch/src/main/scala/samples/scaladsl/Main.scala
@@ -5,23 +5,23 @@
package samples.scaladsl
// #imports
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.scaladsl.adapter._
-import akka.kafka._
-import akka.kafka.scaladsl.{Committer, Consumer}
-import akka.stream.alpakka.elasticsearch.ElasticsearchParams
-import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings
-import akka.stream.alpakka.elasticsearch.WriteMessage
-import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchFlow
-import akka.stream.scaladsl.Sink
-import akka.{Done, NotUsed}
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.actor.typed.scaladsl.adapter._
+import org.apache.pekko.kafka._
+import org.apache.pekko.kafka.scaladsl.{ Committer, Consumer }
+import org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchParams
+import
org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchWriteSettings
+import org.apache.pekko.stream.connectors.elasticsearch.WriteMessage
+import
org.apache.pekko.stream.connectors.elasticsearch.scaladsl.ElasticsearchFlow
+import org.apache.pekko.stream.scaladsl.Sink
+import org.apache.pekko.{ Done, NotUsed }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization._
import spray.json._
import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ Await, ExecutionContext, Future }
// #imports
object Main extends App with Helper {
diff --git a/pekko-connectors-sample-kafka-to-websocket-clients/README.md
b/pekko-connectors-sample-kafka-to-websocket-clients/README.md
index a18b177..870a6e1 100644
--- a/pekko-connectors-sample-kafka-to-websocket-clients/README.md
+++ b/pekko-connectors-sample-kafka-to-websocket-clients/README.md
@@ -2,7 +2,7 @@
## Read from a Kafka topic and push the data to connected websocket clients
-Clients may connect via websockets and will receive data read from a Kafka
topic. The websockets are implemented in @extref[Akka HTTP](akka-http:) and
[Alpakka Kafka](alpakka-kafka:) subscribes to the Kafka topic.
+Clients may connect via websockets and will receive data read from a Kafka
topic. The websockets are implemented in @extref[Akka HTTP](akka-http:) and
[Pekko-Connectors Kafka](pekko-connectors-kafka:) subscribes to the Kafka topic.
Browse the sources at
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-kafka-to-websocket-clients)
{ open=new }.
diff --git
a/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/example.md
b/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/example.md
index a47bfcc..ec4c4ba 100644
---
a/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/example.md
+++
b/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/example.md
@@ -4,7 +4,7 @@ This page highlights the most important sections of the example.
### Subscribe to the Kafka topic
-Use an @extref:[Alpakka Kafka](alpakka-kafka:) consumer to subscribe to a
topic in Kafka. The received `String` values are sent to a
@extref[BroadcastHub](akka:stream/stream-dynamic.html#using-the-broadcasthub)
which creates a `Source` for the clients to connect to.
+Use an @extref:[Pekko-Connectors Kafka](pekko-connectors-kafka:) consumer to
subscribe to a topic in Kafka. The received `String` values are sent to a
@extref[BroadcastHub](akka:stream/stream-dynamic.html#using-the-broadcasthub)
which creates a `Source` for the clients to connect to.
Java
: @@snip [snip](/src/main/java/samples/javadsl/Main.java) {
#kafka-to-broadcast }
diff --git
a/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/full-source.md
b/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/full-source.md
index 1eff395..ad6b292 100644
---
a/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/full-source.md
+++
b/pekko-connectors-sample-kafka-to-websocket-clients/docs/src/main/paradox/full-source.md
@@ -8,7 +8,7 @@ Java
## Helper
The helper uses
[Testcontainers](https://www.testcontainers.org/modules/kafka/) to start a
Kafka broker.
-The `writeToKafka()` method populates the Kafka topic using @extref[Alpakka
Kafka](alpakka-kafka:producer.html)
+The `writeToKafka()` method populates the Kafka topic using
@extref[Pekko-Connectors Kafka](pekko-connectors-kafka:producer.html)
Java
: @@snip [snip](/src/main/java/samples/javadsl/Helper.java) {
filterLabels=true }
diff --git
a/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
b/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
index 2734d73..3e56eb4 100644
---
a/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
+++
b/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
@@ -5,13 +5,13 @@ object Dependencies {
// #deps
val PekkoVersion = "2.6.19"
val AkkaHttpVersion = "10.1.12"
- val AlpakkaKafkaVersion = "3.0.1"
+ val PekkoConnectorsKafkaVersion = "3.0.1"
// #deps
val dependencies = List(
// #deps
"com.typesafe.akka" %% "akka-stream" % PekkoVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
- "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+ "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
// Logging
"com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
diff --git a/pekko-connectors-sample-mqtt-to-kafka/README.md
b/pekko-connectors-sample-mqtt-to-kafka/README.md
index 8cf3e3c..43f49a5 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/README.md
+++ b/pekko-connectors-sample-mqtt-to-kafka/README.md
@@ -2,7 +2,7 @@
## Read from an MQTT topic, group messages and publish to Kafka
-Subscribe to an MQTT topic with @extref[Alpakka MQTT](alpakka:/mqtt.html),
group a few values and publish the aggregate to a Kafka topic with
@extref[Alpakka Kafka](alpakka-kafka:).
+Subscribe to an MQTT topic with @extref[Pekko-Connectors
MQTT](alpakka:/mqtt.html), group a few values and publish the aggregate to a
Kafka topic with @extref[Pekko-Connectors Kafka](pekko-connectors-kafka:).
Browse the sources at
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-mqtt-to-kafka)
{ open=new }.
diff --git a/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
b/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
index 746cd98..f25ba6c 100644
--- a/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
+++ b/pekko-connectors-sample-mqtt-to-kafka/project/Dependencies.scala
@@ -5,7 +5,7 @@ object Dependencies {
// #deps
val PekkoVersion = "2.6.19"
val PekkoConnectorsVersion = "4.0.0"
- val AlpakkaKafkaVersion = "3.0.1"
+ val PekkoConnectorsKafkaVersion = "3.0.1"
val JacksonDatabindVersion = "2.11.4"
// #deps
@@ -13,7 +13,7 @@ object Dependencies {
val dependencies = List(
// #deps
"com.lightbend.akka" %% "akka-stream-alpakka-mqtt" %
PekkoConnectorsVersion,
- "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+ "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
"com.typesafe.akka" %% "akka-stream" % PekkoVersion,
"com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
"com.typesafe.akka" %% "akka-actor" % PekkoVersion,
diff --git a/pekko-connectors-sample-rotate-logs-to-ftp/README.md
b/pekko-connectors-sample-rotate-logs-to-ftp/README.md
index 573052d..5662c01 100644
--- a/pekko-connectors-sample-rotate-logs-to-ftp/README.md
+++ b/pekko-connectors-sample-rotate-logs-to-ftp/README.md
@@ -2,7 +2,7 @@
## Rotate data stream over to multiple compressed files on SFTP server
-This example reads a stream of data and uses @extref[Alpakka
File](alpakka:file.html) `LogRotatorSink` to write multiple files which get
rotated triggered by a rotation function, the files are zipped in-flow and
written to an SFTP server with @extref[Alpakka FTP](alpakka:ftp.html).
+This example reads a stream of data and uses @extref[Pekko-Connectors
File](alpakka:file.html) `LogRotatorSink` to write multiple files which get
rotated triggered by a rotation function, the files are zipped in-flow and
written to an SFTP server with @extref[Pekko-Connectors FTP](alpakka:ftp.html).
Browse the sources at
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-rotate-logs-to-ftp)
{ open=new }.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]