This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git
The following commit(s) were added to refs/heads/main by this push:
new 91f188b [kafka-to-websocket-clients] Migrate akka dependencies to
pekko (#19)
91f188b is described below
commit 91f188b36fd6a48cf1b839b6d80ac165c216d546
Author: Laglangyue <[email protected]>
AuthorDate: Mon Dec 18 15:58:01 2023 +0800
[kafka-to-websocket-clients] Migrate akka dependencies to pekko (#19)
* [kafka-to-websocket-clients] Migrate akka dependencies to pekko
* [kafka-to-websocket-clients] Migrate akka dependencies to pekko
* remove unused PekkoConnectorVersion
* wip
* shutdown when user presses return
* changed by reviewed
* Update build.sbt
---------
Co-authored-by: laglang <[email protected]>
Co-authored-by: Jiafu <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
---
.../build.sbt | 2 +-
.../project/Dependencies.scala | 23 +--
.../src/main/java/samples/javadsl/Helper.java | 58 +++---
.../src/main/java/samples/javadsl/Main.java | 228 ++++++++++-----------
.../src/main/resources/logback.xml | 2 +-
.../src/test/java/samples/javadsl/MainTest.java | 98 ++++-----
6 files changed, 206 insertions(+), 205 deletions(-)
diff --git a/pekko-connectors-sample-kafka-to-websocket-clients/build.sbt
b/pekko-connectors-sample-kafka-to-websocket-clients/build.sbt
index be14757..b576fe8 100644
--- a/pekko-connectors-sample-kafka-to-websocket-clients/build.sbt
+++ b/pekko-connectors-sample-kafka-to-websocket-clients/build.sbt
@@ -1,7 +1,7 @@
import sbt.Keys._
name := "pekko-connectors-sample-kafka-to-websocket-clients"
-organization := "com.lightbend.akka"
+organization := "org.apache.pekko"
version := "1.0.0"
scalaVersion := Dependencies.scalaVer
libraryDependencies ++= Dependencies.dependencies
diff --git
a/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
b/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
index fe5a4b3..ff7a665 100644
---
a/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
+++
b/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
@@ -3,26 +3,25 @@ import sbt._
object Dependencies {
val scalaVer = "2.13.12"
// #deps
- val PekkoVersion = "2.6.19"
- val PekkoHttpVersion = "10.1.12"
- val PekkoConnectorsKafkaVersion = "3.0.1"
+ val PekkoVersion = "1.0.2"
+ val PekkoConnectorsKafkaVersion = "1.0.0"
+ val PekkoHttpVersion = "1.0.0"
+
// #deps
val dependencies = List(
// #deps
- "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
- "com.typesafe.akka" %% "akka-http" % PekkoHttpVersion,
- "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
+ "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
+ "org.apache.pekko" %% "pekko-connectors-kafka" %
PekkoConnectorsKafkaVersion,
// Logging
- "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
"ch.qos.logback" % "logback-classic" % "1.2.13",
// #deps
- "org.testcontainers" % "kafka" % "1.14.3",
-
- "com.typesafe.akka" %% "akka-stream-testkit" % PekkoVersion % Test,
+ "org.testcontainers" % "kafka" % "1.17.6",
+ "org.apache.pekko" %% "pekko-stream-testkit" % PekkoVersion,
"com.google.guava" % "guava" % "28.2-jre" % Test,
- "junit" % "junit" % "4.13" % Test,
-
+ "junit" % "junit" % "4.13.2" % Test
)
}
diff --git
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Helper.java
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Helper.java
index ab1dfad..0473515 100644
---
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Helper.java
+++
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Helper.java
@@ -1,49 +1,51 @@
package samples.javadsl;
-import akka.Done;
-import akka.actor.ActorSystem;
-import akka.kafka.ProducerSettings;
-import akka.kafka.javadsl.Producer;
-import akka.stream.javadsl.Source;
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.kafka.ProducerSettings;
+import org.apache.pekko.kafka.javadsl.Producer;
+import org.apache.pekko.stream.javadsl.Source;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
import java.util.concurrent.CompletionStage;
public class Helper {
- private static final Logger log = LoggerFactory.getLogger(Helper.class);
+ private static final Logger log = LoggerFactory.getLogger(Helper.class);
- private KafkaContainer kafka;
- public String kafkaBootstrapServers;
+ private KafkaContainer kafka;
+ public String kafkaBootstrapServers;
- public Helper() {
- }
+ public Helper() {
+ }
- public void startContainers() {
- kafka = new KafkaContainer("5.1.2"); // contains Kafka 2.1.x
- kafka.start();
- kafkaBootstrapServers = kafka.getBootstrapServers();
- }
+ public void startContainers() {
- public void stopContainers() {
- kafka.stop();
- }
+ kafka = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.1.2")); //
contains Kafka 2.1.x
+ kafka.start();
+ kafkaBootstrapServers = kafka.getBootstrapServers();
+ }
- CompletionStage<Done> writeToKafka(String topic, String item, ActorSystem
actorSystem) {
- ProducerSettings<Integer, String> kafkaProducerSettings =
- ProducerSettings.create(actorSystem, new IntegerSerializer(),
new StringSerializer())
- .withBootstrapServers(kafkaBootstrapServers);
+ public void stopContainers() {
+ kafka.stop();
+ }
- CompletionStage<Done> producing =
- Source.single(new ProducerRecord<Integer, String>(topic, item))
- .runWith(Producer.plainSink(kafkaProducerSettings),
actorSystem);
- producing.thenAccept(s -> log.info("Producing finished"));
- return producing;
- }
+ CompletionStage<Done> writeToKafka(String topic, String item, ActorSystem
actorSystem) {
+ ProducerSettings<Integer, String> kafkaProducerSettings =
+ ProducerSettings.create(actorSystem, new IntegerSerializer(), new
StringSerializer())
+ .withBootstrapServers(kafkaBootstrapServers);
+
+ CompletionStage<Done> producing =
+ Source.single(new ProducerRecord<Integer, String>(topic, item))
+ .runWith(Producer.plainSink(kafkaProducerSettings), actorSystem);
+ producing.thenAccept(s -> log.info("Producing finished"));
+ return producing;
+ }
}
diff --git
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Main.java
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Main.java
index 7ee974d..c11ada2 100644
---
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Main.java
+++
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Main.java
@@ -5,33 +5,34 @@
package samples.javadsl;
// #imports
-import akka.Done;
-import akka.NotUsed;
-import akka.actor.ActorSystem;
-import akka.http.javadsl.ConnectHttp;
-import akka.http.javadsl.Http;
-import akka.http.javadsl.ServerBinding;
-import akka.http.javadsl.model.HttpRequest;
-import akka.http.javadsl.model.HttpResponse;
-import akka.http.javadsl.model.ws.Message;
-import akka.http.javadsl.model.ws.TextMessage;
-import akka.http.javadsl.server.AllDirectives;
-import akka.http.javadsl.server.Route;
-import akka.japi.Pair;
-import akka.kafka.ConsumerSettings;
-import akka.kafka.Subscriptions;
-import akka.kafka.javadsl.Consumer;
-import akka.stream.ActorMaterializer;
-import akka.stream.Materializer;
-import akka.stream.OverflowStrategy;
-import akka.stream.SystemMaterializer;
-import akka.stream.javadsl.BroadcastHub;
-import akka.stream.javadsl.Flow;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pekko.Done;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.http.javadsl.ConnectHttp;
+import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.http.javadsl.ServerBinding;
+import org.apache.pekko.http.javadsl.model.HttpRequest;
+import org.apache.pekko.http.javadsl.model.HttpResponse;
+import org.apache.pekko.http.javadsl.model.ws.Message;
+import org.apache.pekko.http.javadsl.model.ws.TextMessage;
+import org.apache.pekko.http.javadsl.server.AllDirectives;
+import org.apache.pekko.http.javadsl.server.Route;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.kafka.ConsumerSettings;
+import org.apache.pekko.kafka.Subscriptions;
+import org.apache.pekko.kafka.javadsl.Consumer;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.stream.OverflowStrategy;
+import org.apache.pekko.stream.SystemMaterializer;
+import org.apache.pekko.stream.javadsl.BroadcastHub;
+import org.apache.pekko.stream.javadsl.Flow;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,95 +43,94 @@ import java.util.concurrent.TimeUnit;
public class Main extends AllDirectives {
- private static final Logger log = LoggerFactory.getLogger(Main.class);
-
- private final Helper helper;
- private final String kafkaBootstrapServers;
-
- private final String topic = "message-topic";
- private final String groupId = "docs-group";
-
- private ActorSystem actorSystem;
- private Materializer materializer;
-
- public Main(Helper helper) {
- this.kafkaBootstrapServers = helper.kafkaBootstrapServers;
- this.helper = helper;
- }
-
- public static void main(String[] args) throws Exception {
- Helper helper = new Helper();
- helper.startContainers();
- Main main = new Main(helper);
- main.run();
- helper.stopContainers();
- }
-
- private void run() throws Exception {
- actorSystem = ActorSystem.create("KafkaToWebSocket");
- materializer = SystemMaterializer.get(actorSystem).materializer();
- Http http = Http.get(actorSystem);
-
- // #websocket-handler
- Flow<Message, Message, ?> webSocketHandler =
- Flow.fromSinkAndSource(
- Sink.ignore(),
- topicSource()
- // decouple clients from each other: if a client is too
slow and more than 1000 elements to be sent to
- // to the client queue up here, we fail this client
- .buffer(1000, OverflowStrategy.fail())
- .via(addIndexFlow())
- .map(TextMessage::create));
- // #websocket-handler
-
- final Flow<HttpRequest, HttpResponse, ?> routeFlow =
createRoute(webSocketHandler).flow(actorSystem, materializer);
- final CompletionStage<ServerBinding> binding =
http.bindAndHandle(routeFlow,
- ConnectHttp.toHost("localhost", 8081), materializer);
-
- binding.toCompletableFuture().get(10, TimeUnit.SECONDS);
-
- System.out.println("Server online at http://localhost:8081/\nPress
RETURN to stop...");
- System.in.read(); // let it run until user presses return
- }
-
- public Flow<String, String, NotUsed> addIndexFlow() {
- final Pair<Integer, String> seed = Pair.create(0, "start");
- return Flow.of(String.class)
- .scan(seed, (acc, message) -> {
- Integer index = acc.first();
- return Pair.create(index + 1, String.format("index: %s,
message: %s", index, message));
- })
- .filterNot(p -> p == seed)
- .map(Pair::second);
- }
-
- // #routes
- private Route createRoute(Flow<Message, Message, ?> webSocketHandler) {
- return concat(
- path("events", () ->
handleWebSocketMessages(webSocketHandler)),
- path("push", () -> parameter("value", v -> {
- CompletionStage<Done> written = helper.writeToKafka(topic,
v, actorSystem);
- return onSuccess(written, done -> complete("Ok"));
- }))
- );
- }
- // #routes
-
- // #kafka-to-broadcast
- private Source<String, ?> topicSource() {
- ConsumerSettings<Integer, String> kafkaConsumerSettings =
+ private static final Logger log = LoggerFactory.getLogger(Main.class);
+
+ private final Helper helper;
+ private final String kafkaBootstrapServers;
+
+ private final String topic = "message-topic";
+ private final String groupId = "docs-group";
+
+ private ActorSystem actorSystem;
+ private Materializer materializer;
+
+ public Main(Helper helper) {
+ this.kafkaBootstrapServers = helper.kafkaBootstrapServers;
+ this.helper = helper;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Helper helper = new Helper();
+ helper.startContainers();
+ Main main = new Main(helper);
+ main.run();
+ helper.stopContainers();
+ }
+
+ private void run() throws Exception {
+ actorSystem = ActorSystem.create("KafkaToWebSocket");
+ materializer = SystemMaterializer.get(actorSystem).materializer();
+ Http http = Http.get(actorSystem);
+
+ // #websocket-handler
+ Flow<Message, Message, ?> webSocketHandler =
+ Flow.fromSinkAndSource(
+ Sink.ignore(),
+ topicSource()
+ // decouple clients from each other: if a client is too slow
and more than 1000 elements to be sent to
+ // the client queue up here, we fail this client
+ .buffer(1000, OverflowStrategy.fail())
+ .via(addIndexFlow())
+ .map(TextMessage::create));
+ // #websocket-handler
+ final Flow<HttpRequest, HttpResponse, ?> routeFlow =
createRoute(webSocketHandler).flow(actorSystem, materializer);
+ final CompletionStage<ServerBinding> binding =
http.bindAndHandle(routeFlow,
+ ConnectHttp.toHost("localhost", 8081), materializer);
+
+ binding.toCompletableFuture().get(10, TimeUnit.SECONDS);
+ System.out.println("Server online at http://localhost:8081/\nPress RETURN
to stop...");
+ System.in.read(); // let it run until user presses return
+ actorSystem.terminate();
+ }
+
+ public Flow<String, String, NotUsed> addIndexFlow() {
+ final Pair<Integer, String> seed = Pair.create(0, "start");
+ return Flow.of(String.class)
+ .scan(seed, (acc, message) -> {
+ Integer index = acc.first();
+ return Pair.create(index + 1, String.format("index: %s, message:
%s", index, message));
+ })
+ .filterNot(p -> p == seed)
+ .map(Pair::second);
+ }
+
+ // #routes
+ private Route createRoute(Flow<Message, Message, ?> webSocketHandler) {
+ return concat(
+ path("events", () -> handleWebSocketMessages(webSocketHandler)),
+ path("push", () -> parameter("value", v -> {
+ CompletionStage<Done> written = helper.writeToKafka(topic, v,
actorSystem);
+ return onSuccess(written, done -> complete("Ok"));
+ }))
+ );
+ }
+ // #routes
+
+ // #kafka-to-broadcast
+ private Source<String, ?> topicSource() {
+ ConsumerSettings<Integer, String> kafkaConsumerSettings =
ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new
StringDeserializer())
- .withBootstrapServers(kafkaBootstrapServers)
- .withGroupId(groupId)
- .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest")
- .withStopTimeout(Duration.ofSeconds(5));
-
- return
- Consumer.plainSource(kafkaConsumerSettings,
Subscriptions.topics(topic))
- .map(consumerRecord -> consumerRecord.value())
- // using a broadcast hub here, ensures that all websocket
clients will use the same
- // consumer
- .runWith(BroadcastHub.of(String.class), materializer);
- }
- // #kafka-to-broadcast
+ .withBootstrapServers(kafkaBootstrapServers)
+ .withGroupId(groupId)
+ .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ .withStopTimeout(Duration.ofSeconds(5));
+
+ return
+ Consumer.plainSource(kafkaConsumerSettings,
Subscriptions.topics(topic))
+ .map(ConsumerRecord::value)
+ // using a broadcast hub here, ensures that all websocket clients
will use the same
+ // consumer
+ .runWith(BroadcastHub.of(String.class), materializer);
+ }
+ // #kafka-to-broadcast
}
diff --git
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/resources/logback.xml
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/resources/logback.xml
index 2923e11..3c22058 100644
---
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/resources/logback.xml
+++
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/resources/logback.xml
@@ -7,7 +7,7 @@
</encoder>
</appender>
- <logger name="akka" level="WARN"/>
+ <logger name="org.apache.pekko" level="WARN"/>
<logger name="com.github.dockerjava" level="INFO"/>
<logger name="org.testcontainers" level="INFO"/>
diff --git
a/pekko-connectors-sample-kafka-to-websocket-clients/src/test/java/samples/javadsl/MainTest.java
b/pekko-connectors-sample-kafka-to-websocket-clients/src/test/java/samples/javadsl/MainTest.java
index 18847cc..0d7c49c 100644
---
a/pekko-connectors-sample-kafka-to-websocket-clients/src/test/java/samples/javadsl/MainTest.java
+++
b/pekko-connectors-sample-kafka-to-websocket-clients/src/test/java/samples/javadsl/MainTest.java
@@ -1,14 +1,15 @@
package samples.javadsl;
-import akka.NotUsed;
-import akka.actor.ActorSystem;
-import akka.japi.Pair;
-import akka.stream.Materializer;
-import akka.stream.javadsl.Flow;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
-import akka.testkit.javadsl.TestKit;
+import static org.junit.Assert.assertTrue;
+
import com.google.common.collect.Streams;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.stream.javadsl.Flow;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -20,54 +21,53 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.junit.Assert.assertTrue;
-
public class MainTest {
- static ActorSystem system;
- @BeforeClass
- public static void setup() {
- system = ActorSystem.create("WebsocketExampleMainTest");
- }
+ static ActorSystem system;
+
+ @BeforeClass
+ public static void setup() {
+ system = ActorSystem.create("WebsocketExampleMainTest");
+ }
- @AfterClass
- public static void tearDown() {
- TestKit.shutdownActorSystem(system);
- system = null;
- }
+ @AfterClass
+ public static void tearDown() {
+ TestKit.shutdownActorSystem(system);
+ system = null;
+ }
- @Test
- public void addIndexFlow_Test() throws Exception {
- final Main example = new Main(new Helper());
+ @Test
+ public void addIndexFlow_Test() throws Exception {
+ final Main example = new Main(new Helper());
- List<String> messages = Arrays.asList(
- "I say high, you say low",
- "You say why and I say I don't know",
- "Oh, no",
- "You say goodbye and I say hello"
- );
+ List<String> messages = Arrays.asList(
+ "I say high, you say low",
+ "You say why and I say I don't know",
+ "Oh, no",
+ "You say goodbye and I say hello"
+ );
- final Flow<String, String, NotUsed> addIndexFlow =
example.addIndexFlow();
+ final Flow<String, String, NotUsed> addIndexFlow = example.addIndexFlow();
- final CompletionStage<List<String>> future = Source.from(messages)
- .via(addIndexFlow)
- .runWith(Sink.seq(), system);
- final List<String> result = future.toCompletableFuture().get(3,
TimeUnit.SECONDS);
+ final CompletionStage<List<String>> future = Source.from(messages)
+ .via(addIndexFlow)
+ .runWith(Sink.seq(), system);
+ final List<String> result = future.toCompletableFuture().get(3,
TimeUnit.SECONDS);
- assert(result.size() == messages.size());
+ assert (result.size() == messages.size());
- final Pattern pattern = Pattern.compile("index: \\d+, message: (.*)");
- Streams.zip(
- messages.stream(),
- result.stream(),
- Pair::create)
- .forEachOrdered(pair -> {
- String message = pair.first();
- String resultMessage = pair.second();
- Matcher matcher = pattern.matcher(resultMessage);
- assertTrue(matcher.find());
- assert(matcher.groupCount() == 1);
- assert(matcher.group(1).equals(message));
- });
- }
+ final Pattern pattern = Pattern.compile("index: \\d+, message: (.*)");
+ Streams.zip(
+ messages.stream(),
+ result.stream(),
+ Pair::create)
+ .forEachOrdered(pair -> {
+ String message = pair.first();
+ String resultMessage = pair.second();
+ Matcher matcher = pattern.matcher(resultMessage);
+ assertTrue(matcher.find());
+ assert (matcher.groupCount() == 1);
+ assert (matcher.group(1).equals(message));
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]