This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git
The following commit(s) were added to refs/heads/main by this push:
new 61e4dd1 [jdbc-to-elasticsearch] Migrate akka dependencies to pekko
(#13)
61e4dd1 is described below
commit 61e4dd1484ca51b5ae5cba4198de7173d799beac
Author: Laglangyue <[email protected]>
AuthorDate: Sat Dec 16 18:48:04 2023 +0800
[jdbc-to-elasticsearch] Migrate akka dependencies to pekko (#13)
* migrate akka to pekko
* add license
* remove
* remove
* replace slf4j version
* revert license
* test local
* remove license and revert some changes
* remove apache license and revert some changes
* some updates
* try to fix doc build
* Update Dependencies.scala
---------
Co-authored-by: laglang <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
---
.github/workflows/compile.yml | 4 +-
docs/build.sbt | 17 ++--
docs/project/Dependencies.scala | 4 +-
jdbc-to-elasticsearch.conf | 6 +-
.../.courseName | 2 +-
.../README.md | 8 +-
.../common/src/main/resources/application.conf | 6 +-
.../common/src/main/resources/logback.xml | 2 +-
.../docs/src/main/paradox/index.md | 4 +-
.../project/CommonSettings.scala | 4 +-
.../project/Dependencies.scala | 27 +++---
.../project/build.properties | 2 +-
.../step_001_complete/README.md | 2 +-
.../src/main/java/samples/javadsl/Main.java | 99 +++++++++++-----------
.../src/main/resources/logback.xml | 2 +-
.../src/main/scala/samples/scaladsl/Helper.scala | 36 +++-----
.../src/main/scala/samples/scaladsl/Main.scala | 29 +++----
17 files changed, 121 insertions(+), 133 deletions(-)
diff --git a/.github/workflows/compile.yml b/.github/workflows/compile.yml
index 14c0c03..20e218c 100644
--- a/.github/workflows/compile.yml
+++ b/.github/workflows/compile.yml
@@ -15,7 +15,7 @@ jobs:
steps:
- name: Checkout
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Setup Java 11
uses: actions/setup-java@v3
@@ -56,7 +56,7 @@ jobs:
steps:
- name: Checkout
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Setup Java 11
uses: actions/setup-java@v3
diff --git a/docs/build.sbt b/docs/build.sbt
index eda51be..abea44f 100644
--- a/docs/build.sbt
+++ b/docs/build.sbt
@@ -1,5 +1,5 @@
-ThisBuild / scalaVersion := "2.13.2"
+ThisBuild / scalaVersion := "2.13.12"
enablePlugins(AkkaParadoxPlugin, ParadoxSitePlugin, PublishRsyncPlugin)
@@ -77,14 +77,13 @@ JdbcToElasticsearch / paradoxProperties ++= Map(
"canonical.base_url" -> s"${homepage.value.get}/${JdbcToElasticsearch.name}",
"snip.build.base_dir" ->
s"${baseDirectory.value}/../pekko-connectors-sample-${JdbcToElasticsearch.name}",
"github.root.base_dir" -> s"${baseDirectory.value}/..",
- // Alpakka
- "scaladoc.akka.stream.alpakka.base_url" ->
s"https://doc.akka.io/api/alpakka/${Dependencies.JdbcToElasticsearch.AlpakkaVersion}",
- "javadoc.akka.base_url" -> "",
- "extref.alpakka.base_url" ->
s"https://doc.akka.io/docs/alpakka/${Dependencies.JdbcToElasticsearch.AlpakkaVersion}/%s",
+ "scaladoc.akka.stream.alpakka.base_url" ->
s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.JdbcToElasticsearch.PekkoConnectorsVersion}",
+ "javadoc.pekko.base_url" -> "",
+ "extref.pekko-connectors.base_url" ->
s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.JdbcToElasticsearch.PekkoConnectorsVersion}/%s",
// Akka
- "scaladoc.akka.base_url" ->
s"https://doc.akka.io/api/akka/${Dependencies.JdbcToElasticsearch.AkkaVersion}",
- "javadoc.akka.base_url" ->
s"https://doc.akka.io/japi/akka/${Dependencies.JdbcToElasticsearch.AkkaVersion}",
- "extref.akka.base_url" ->
s"https://doc.akka.io/docs/akka/${Dependencies.JdbcToElasticsearch.AkkaVersion}/%s",
+ "scaladoc.pekko.base_url" ->
s"https://pekko.apache.org/api/pekko/${Dependencies.JdbcToElasticsearch.PekkoVersion}",
+ "javadoc.pekko.base_url" ->
s"https://pekko.apache.org/japi/pekko/${Dependencies.JdbcToElasticsearch.PekkoVersion}",
+ "extref.pekko.base_url" ->
s"https://pekko.apache.org/docs/pekko/${Dependencies.JdbcToElasticsearch.PekkoVersion}/%s",
)
JdbcToElasticsearch / paradoxGroups := Map("Language" -> Seq("Java", "Scala"))
@@ -261,7 +260,5 @@ paradoxProperties ++= Map(
"extref.pekko-connectors.base_url" ->
"https://pekko.apache.org/docs/pekko-connectors/current/",
)
-resolvers += Resolver.jcenterRepo
-
publishRsyncArtifacts += makeSite.value -> "akka.io/alpakka-samples/"
publishRsyncHost := "[email protected]"
diff --git a/docs/project/Dependencies.scala b/docs/project/Dependencies.scala
index e7c3293..546c783 100644
--- a/docs/project/Dependencies.scala
+++ b/docs/project/Dependencies.scala
@@ -44,8 +44,8 @@ object Dependencies {
}
val ScalaVersion = versions("scalaVer")
- val AkkaVersion = versions("AkkaVersion")
- val AlpakkaVersion = versions("AlpakkaVersion")
+ val PekkoVersion = versions("PekkoVersion")
+ val PekkoConnectorsVersion = versions("PekkoConnectorsVersion")
}
object Jms {
diff --git a/jdbc-to-elasticsearch.conf b/jdbc-to-elasticsearch.conf
index f4c1128..0fe9cf4 100644
--- a/jdbc-to-elasticsearch.conf
+++ b/jdbc-to-elasticsearch.conf
@@ -12,11 +12,11 @@ studentify {
exercise-project-prefix = "step"
- master-base-project-name = "alpakka-sample-jdbc-to-elasticsearch_master"
+ master-base-project-name =
"pekko-connectors-sample-jdbc-to-elasticsearch_master"
- studentified-project-name = "alpakka-sample-jdbc-to-elasticsearch_sample"
+ studentified-project-name =
"pekko-connectors-sample-jdbc-to-elasticsearch_sample"
- relative-source-folder = "alpakka-sample-jdbc-to-elasticsearch"
+ relative-source-folder = "pekko-connectors-sample-jdbc-to-elasticsearch"
readme-in-test-resources = false
diff --git a/pekko-connectors-sample-jdbc-to-elasticsearch/.courseName
b/pekko-connectors-sample-jdbc-to-elasticsearch/.courseName
index 3a379a6..7640ad5 100644
--- a/pekko-connectors-sample-jdbc-to-elasticsearch/.courseName
+++ b/pekko-connectors-sample-jdbc-to-elasticsearch/.courseName
@@ -1 +1 @@
-Alpakka: JDBC (Slick) to Elasticsearch
+Pekko Connectors: JDBC (Slick) to Elasticsearch
diff --git a/pekko-connectors-sample-jdbc-to-elasticsearch/README.md
b/pekko-connectors-sample-jdbc-to-elasticsearch/README.md
index 0761470..71fab43 100644
--- a/pekko-connectors-sample-jdbc-to-elasticsearch/README.md
+++ b/pekko-connectors-sample-jdbc-to-elasticsearch/README.md
@@ -1,9 +1,9 @@
-# Alpakka sample
+# Apache Pekko Connectors sample
## Read from a database and publish to Elasticsearch
-This example uses @extref[Alpakka Slick](alpakka:slick.html) to read from a
database, and stores the data in Elasticsearch.
+This example uses @extref[Pekko-Connectors Slick](pekko-connectors:slick.html)
to read from a database, and stores the data in Elasticsearch.
-Browse the sources at
@link:[Github](https://github.com/akka/alpakka-samples/tree/master/alpakka-sample-jdbc-to-elasticsearch)
{ open=new }.
+Browse the sources at
@link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-jdbc-to-elasticsearch)
{ open=new }.
-To try out this project clone @link:[the Alpakka Samples
repository](https://github.com/akka/alpakka-samples) { open=new } and find it
in the `alpakka-sample-jdbc-to-elasticsearch` directory.
+To try out this project clone @link:[the Pekko-Connectors Samples
repository](https://github.com/apache/incubator-pekko-connectors-samples) {
open=new } and find it in the `pekko-connectors-sample-jdbc-to-elasticsearch`
directory.
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/common/src/main/resources/application.conf
b/pekko-connectors-sample-jdbc-to-elasticsearch/common/src/main/resources/application.conf
index 93d01bf..fa8d231 100644
---
a/pekko-connectors-sample-jdbc-to-elasticsearch/common/src/main/resources/application.conf
+++
b/pekko-connectors-sample-jdbc-to-elasticsearch/common/src/main/resources/application.conf
@@ -1,6 +1,6 @@
-akka {
- loggers = ["akka.event.slf4j.Slf4jLogger"]
- logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
loglevel = "DEBUG"
}
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/common/src/main/resources/logback.xml
b/pekko-connectors-sample-jdbc-to-elasticsearch/common/src/main/resources/logback.xml
index d16b740..e9de74a 100644
---
a/pekko-connectors-sample-jdbc-to-elasticsearch/common/src/main/resources/logback.xml
+++
b/pekko-connectors-sample-jdbc-to-elasticsearch/common/src/main/resources/logback.xml
@@ -8,7 +8,7 @@
</appender>
<logger name="slick" level="INFO"/>
- <logger name="akka.stream.alpakka.elasticsearch" level="INFO"/>
+ <logger name="org.apache.pekko.stream.connectors.elasticsearch"
level="INFO"/>
<logger name="com.github.dockerjava" level="INFO"/>
<logger name="org.testcontainers" level="INFO"/>
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/docs/src/main/paradox/index.md
b/pekko-connectors-sample-jdbc-to-elasticsearch/docs/src/main/paradox/index.md
index 4e24210..5d4d189 100644
---
a/pekko-connectors-sample-jdbc-to-elasticsearch/docs/src/main/paradox/index.md
+++
b/pekko-connectors-sample-jdbc-to-elasticsearch/docs/src/main/paradox/index.md
@@ -6,9 +6,9 @@ Dependencies (sbt notation)
: @@snip [snip](/project/Dependencies.scala) { #deps }
-### All Alpakka samples
+### All Pekko-Connectors samples
-Show [Alpakka samples listing](../index.html).
+Show [Pekko Connectors samples listing](../index.html).
@@toc
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/project/CommonSettings.scala
b/pekko-connectors-sample-jdbc-to-elasticsearch/project/CommonSettings.scala
index de63468..fba2144 100644
--- a/pekko-connectors-sample-jdbc-to-elasticsearch/project/CommonSettings.scala
+++ b/pekko-connectors-sample-jdbc-to-elasticsearch/project/CommonSettings.scala
@@ -3,8 +3,8 @@ import sbt._
object CommonSettings {
lazy val commonSettings = Seq(
- organization := "com.lightbend.akka",
- version := "1.3.0",
+ organization := "org.apache.pekko",
+ version := "1.0.1",
scalaVersion := Dependencies.scalaVer,
scalacOptions ++= CompileOptions.compileOptions,
Compile / unmanagedSourceDirectories := List((Compile /
scalaSource).value, (Compile / javaSource).value),
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/project/Dependencies.scala
b/pekko-connectors-sample-jdbc-to-elasticsearch/project/Dependencies.scala
index 1719ea0..9d9e503 100644
--- a/pekko-connectors-sample-jdbc-to-elasticsearch/project/Dependencies.scala
+++ b/pekko-connectors-sample-jdbc-to-elasticsearch/project/Dependencies.scala
@@ -1,27 +1,28 @@
import sbt._
object Dependencies {
- val scalaVer = "2.13.8"
+ val scalaVer = "2.13.12"
// #deps
- val AkkaVersion = "2.6.19"
- val AlpakkaVersion = "4.0.0"
+ val PekkoVersion = "1.0.2"
+
+ val PekkoConnectorsVersion = "1.0.1"
// #deps
val dependencies = List(
- // #deps
- "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
- "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
- "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" %
AlpakkaVersion,
- "com.lightbend.akka" %% "akka-stream-alpakka-slick" % AlpakkaVersion,
+ // #deps
+ "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-connectors-elasticsearch" %
PekkoConnectorsVersion,
+ "org.apache.pekko" %% "pekko-connectors-slick" % PekkoConnectorsVersion,
// for JSON in Scala
"io.spray" %% "spray-json" % "1.3.6",
// Logging
- "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
- "ch.qos.logback" % "logback-classic" % "1.2.11",
- // #deps
+ "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
+ "ch.qos.logback" % "logback-classic" % "1.2.13",
+ // #deps
"com.h2database" % "h2" % "2.1.214",
- "org.testcontainers" % "elasticsearch" % "1.17.3",
- "org.testcontainers" % "postgresql" % "1.17.3"
+ "org.testcontainers" % "elasticsearch" % "1.17.6",
+ "org.testcontainers" % "postgresql" % "1.17.6"
)
}
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/project/build.properties
b/pekko-connectors-sample-jdbc-to-elasticsearch/project/build.properties
index 22af262..e8a1e24 100644
--- a/pekko-connectors-sample-jdbc-to-elasticsearch/project/build.properties
+++ b/pekko-connectors-sample-jdbc-to-elasticsearch/project/build.properties
@@ -1 +1 @@
-sbt.version=1.7.1
+sbt.version=1.9.7
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/README.md
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/README.md
index 2ba872e..fa1acfc 100644
--- a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/README.md
+++ b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/README.md
@@ -2,4 +2,4 @@
### Description
-This example reads the `MOVIES` table in a relational database using Alpakka
Slick and writes it to an Elasticsearch index.
+This example reads the `MOVIES` table in a relational database using
Pekko-Connectors Slick and writes it to an Elasticsearch index.
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/java/samples/javadsl/Main.java
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/java/samples/javadsl/Main.java
index 43a7828..0a2d32f 100644
---
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/java/samples/javadsl/Main.java
+++
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/java/samples/javadsl/Main.java
@@ -5,27 +5,31 @@
package samples.javadsl;
// #imports
-import akka.Done;
-import akka.actor.typed.ActorSystem;
-import akka.actor.typed.javadsl.Behaviors;
-import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings;
-import akka.stream.alpakka.elasticsearch.ElasticsearchParams;
-import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings;
-import akka.stream.alpakka.elasticsearch.WriteMessage;
-import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink;
-import akka.stream.alpakka.slick.javadsl.Slick;
-import akka.stream.alpakka.slick.javadsl.SlickRow;
-import akka.stream.alpakka.slick.javadsl.SlickSession;
+
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.concurrent.CompletionStage;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import
org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchConnectionSettings;
+import org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchParams;
+import
org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchWriteSettings;
+import org.apache.pekko.stream.connectors.elasticsearch.WriteMessage;
+import
org.apache.pekko.stream.connectors.elasticsearch.javadsl.ElasticsearchSink;
+import org.apache.pekko.stream.connectors.slick.javadsl.Slick;
+import org.apache.pekko.stream.connectors.slick.javadsl.SlickRow;
+import org.apache.pekko.stream.connectors.slick.javadsl.SlickSession;
+import org.apache.pekko.stream.connectors.slick.javadsl.SlickSession$;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import samples.scaladsl.Helper;
+
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
-import java.util.concurrent.CompletionStage;
-
// #imports
public class Main {
@@ -37,6 +41,7 @@ public class Main {
// #data-class
public static class Movie {
+
public final int id;
public final String title;
public final String genre;
@@ -44,10 +49,10 @@ public class Main {
@JsonCreator
public Movie(
- @JsonProperty("id") int id,
- @JsonProperty("title") String title,
- @JsonProperty("genre") String genre,
- @JsonProperty("gross") double gross) {
+ @JsonProperty("id") int id,
+ @JsonProperty("title") String title,
+ @JsonProperty("genre") String genre,
+ @JsonProperty("gross") double gross) {
this.id = id;
this.title = title;
this.genre = genre;
@@ -58,17 +63,16 @@ public class Main {
// #data-class
void run() {
- // Testcontainers: start Elasticsearch in Docker
+ // TestContainers: start Elasticsearch in Docker
ElasticsearchContainer elasticsearchContainer = new
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2");
elasticsearchContainer.start();
String elasticsearchAddress = "http://" +
elasticsearchContainer.getHttpHostAddress();
// #sample
- ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(),
"alpakka-sample");
-
+ ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(),
"pekko-es-jdbc-sample");
// #sample
// #slick-setup
- SlickSession session = SlickSession.forConfig("slick-h2-mem");
+ SlickSession session = SlickSession$.MODULE$.forConfig("slick-h2-mem");
system.getWhenTerminated().thenAccept(done -> session.close());
// #slick-setup
@@ -84,34 +88,33 @@ public class Main {
// #sample
final CompletionStage<Done> done =
- Slick.source(
- session,
- "SELECT * FROM MOVIE",
- (SlickRow row) ->
- new Movie(row.nextInt(), row.nextString(),
row.nextString(), row.nextDouble()))
- .map(movie ->
WriteMessage.createIndexMessage(String.valueOf(movie.id), movie))
- .runWith(
- ElasticsearchSink.create(
- ElasticsearchParams.V7("movie"),
-
ElasticsearchWriteSettings.create(connectionSettings),
- objectToJsonMapper),
- system);
+ Slick.source(
+ session,
+ "SELECT * FROM MOVIE",
+ (SlickRow row) ->
+ new Movie(row.nextInt(), row.nextString(),
row.nextString(), row.nextDouble()))
+ .map(movie ->
WriteMessage.createIndexMessage(String.valueOf(movie.id), movie))
+ .runWith(
+ ElasticsearchSink.create(
+ ElasticsearchParams.V7("movie"),
+ ElasticsearchWriteSettings.create(connectionSettings),
+ objectToJsonMapper),
+ system);
// #sample
-
+ system.getWhenTerminated().thenAccept(consumer ->
elasticsearchContainer.stop());
done.thenRunAsync(
- () -> {
- elasticsearchContainer.stop();
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // ignored
- }
- system.terminate();
- try {
- Await.result(system.whenTerminated(),
Duration.create("10s"));
- } catch (Exception e) {
- // ignored
- }
- });
+ () -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignored
+ }
+ system.terminate();
+ try {
+ Await.result(system.whenTerminated(),
Duration.create("10s"));
+ } catch (Exception e) {
+ // ignored
+ }
+ });
}
}
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/resources/logback.xml
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/resources/logback.xml
index 51548c6..def0738 100644
---
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/resources/logback.xml
+++
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/resources/logback.xml
@@ -7,7 +7,7 @@
</encoder>
</appender>
- <logger name="akka" level="WARN"/>
+ <logger name="pekko" level="WARN"/>
<logger name="com.github.dockerjava" level="INFO"/>
<logger name="org.testcontainers" level="INFO"/>
<logger name="slick" level="INFO"/>
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/scala/samples/scaladsl/Helper.scala
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/scala/samples/scaladsl/Helper.scala
index 63ad722..c1099a1 100644
---
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/scala/samples/scaladsl/Helper.scala
+++
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/scala/samples/scaladsl/Helper.scala
@@ -4,31 +4,24 @@
package samples.scaladsl
-import akka.actor.typed.ActorSystem
-import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings
-import akka.stream.alpakka.elasticsearch.ElasticsearchParams
-import akka.stream.alpakka.elasticsearch.ElasticsearchSourceSettings
-import akka.stream.alpakka.elasticsearch.WriteMessage
-import akka.stream.alpakka.elasticsearch.scaladsl.{ElasticsearchFlow,
ElasticsearchSource}
-import akka.stream.alpakka.slick.javadsl.SlickSession
-import akka.stream.alpakka.slick.scaladsl.Slick
-import akka.stream.scaladsl.{Keep, Sink, Source}
-import akka.{Done, NotUsed}
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.stream.connectors.elasticsearch._
+import
org.apache.pekko.stream.connectors.elasticsearch.scaladsl.ElasticsearchSource
+import org.apache.pekko.stream.connectors.slick.scaladsl.{ Slick, SlickSession
}
+import org.apache.pekko.stream.scaladsl.{ Sink, Source }
import org.slf4j.LoggerFactory
import org.testcontainers.elasticsearch.ElasticsearchContainer
import samples.scaladsl.Main.Movie
-import spray.json.DefaultJsonProtocol._
-import spray.json._
import scala.collection.immutable
import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ Await, Future }
trait Helper {
final val log = LoggerFactory.getLogger(getClass)
- // Testcontainers: start Elasticsearch in Docker
+ // TestContainers: start Elasticsearch in Docker
val elasticsearchContainer = new
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2")
elasticsearchContainer.start()
val elasticsearchAddress: String = "http://" +
elasticsearchContainer.getHttpHostAddress
@@ -53,18 +46,18 @@ object Helper {
import session.profile.api._
- //Drop table if already exists
+ // Drop table if already exists
val dropTableFut =
sqlu"""drop table if exists MOVIE"""
- //Create movie table
+ // Create movie table
val createTableFut =
sqlu"""create table MOVIE (ID INT PRIMARY KEY, TITLE varchar, GENRE
varchar, GROSS numeric(10,2))"""
Await.result(session.db.run(dropTableFut), 10.seconds)
Await.result(session.db.run(createTableFut), 10.seconds)
- //A class just for organizing the data before using it in the insert
clause. Could have been insertFut with a Tuple too
+ // A class just for organizing the data before using it in the insert
clause. Could have been insertFut with a Tuple too
case class MovieInsert(id: Int, title: String, genre: String, gross:
Double)
val movies = List(
@@ -73,15 +66,12 @@ object Helper {
MovieInsert(3, "Wonder Woman", "Action", 2.744),
MovieInsert(4, "Guardians of the Galaxy", "Action", 2.568),
MovieInsert(5, "Moana", "Musical", 2.493),
- MovieInsert(6, "Spider-Man", "Action", 1.784)
- )
+ MovieInsert(6, "Spider-Man", "Action", 1.784))
Source(movies)
.via(
- Slick.flow(
- movie => sqlu"INSERT INTO MOVIE VALUES (${movie.id}, ${movie.title},
${movie.genre}, ${movie.gross})"
- )
- )
+ Slick.flow(movie =>
+ sqlu"INSERT INTO MOVIE VALUES (${movie.id}, ${movie.title},
${movie.genre}, ${movie.gross})"))
.runWith(Sink.ignore)
}
diff --git
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/scala/samples/scaladsl/Main.scala
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/scala/samples/scaladsl/Main.scala
index f9f6787..fd8c9ee 100644
---
a/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/scala/samples/scaladsl/Main.scala
+++
b/pekko-connectors-sample-jdbc-to-elasticsearch/step_001_complete/src/main/scala/samples/scaladsl/Main.scala
@@ -5,38 +5,35 @@
package samples.scaladsl
// #imports
-import akka.Done
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.scaladsl.Behaviors
-import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings
-import akka.stream.alpakka.elasticsearch.ElasticsearchParams
-import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings
-import akka.stream.alpakka.elasticsearch.WriteMessage
-import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
-import akka.stream.alpakka.slick.javadsl.SlickSession
-import akka.stream.alpakka.slick.scaladsl.Slick
-import spray.json.DefaultJsonProtocol.{jsonFormat4, _}
+
+import org.apache.pekko.Done
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.stream.connectors.elasticsearch._
+import
org.apache.pekko.stream.connectors.elasticsearch.scaladsl.ElasticsearchSink
+import org.apache.pekko.stream.connectors.slick.scaladsl.{ Slick, SlickSession
}
+import spray.json.DefaultJsonProtocol.{ jsonFormat4, _ }
import spray.json.JsonFormat
import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ Await, ExecutionContext, Future }
// #imports
object Main extends App with Helper {
- implicit val actorSystem: ActorSystem[Nothing] =
ActorSystem(Behaviors.empty, "alpakka-sample")
+ implicit val actorSystem: ActorSystem[Nothing] =
ActorSystem(Behaviors.empty, "pekko-es-jdbc-sample")
implicit val executionContext: ExecutionContext =
actorSystem.executionContext
- def wait(duration: FiniteDuration): Unit = Thread.sleep(duration.toMillis)
+ private def wait(duration: FiniteDuration): Unit =
Thread.sleep(duration.toMillis)
- def terminateActorSystem(): Unit = {
+ private def terminateActorSystem(): Unit = {
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, 1.seconds)
}
// #slick-setup
- implicit val session = SlickSession.forConfig("slick-h2-mem")
// (1)
+ implicit val session: SlickSession = SlickSession.forConfig("slick-h2-mem")
// (1)
actorSystem.whenTerminated.map(_ => session.close())
import session.profile.api._
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]