This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git


The following commit(s) were added to refs/heads/main by this push:
     new 91f188b  [kafka-to-websocket-clients] Migrate akka dependencies to 
pekko (#19)
91f188b is described below

commit 91f188b36fd6a48cf1b839b6d80ac165c216d546
Author: Laglangyue <[email protected]>
AuthorDate: Mon Dec 18 15:58:01 2023 +0800

    [kafka-to-websocket-clients] Migrate akka dependencies to pekko (#19)
    
    * [kafka-to-websocket-clients] Migrate akka dependencies to pekko
    
    * [kafka-to-websocket-clients] Migrate akka dependencies to pekko
    
    * remove unused PekkoConnectorVersion
    
    * wip
    
    * shutdown when user presses return
    
    * changed by reviewed
    
    * Update build.sbt
    
    ---------
    
    Co-authored-by: laglang <[email protected]>
    Co-authored-by: Jiafu <[email protected]>
    Co-authored-by: PJ Fanning <[email protected]>
---
 .../build.sbt                                      |   2 +-
 .../project/Dependencies.scala                     |  23 +--
 .../src/main/java/samples/javadsl/Helper.java      |  58 +++---
 .../src/main/java/samples/javadsl/Main.java        | 228 ++++++++++-----------
 .../src/main/resources/logback.xml                 |   2 +-
 .../src/test/java/samples/javadsl/MainTest.java    |  98 ++++-----
 6 files changed, 206 insertions(+), 205 deletions(-)

diff --git a/pekko-connectors-sample-kafka-to-websocket-clients/build.sbt 
b/pekko-connectors-sample-kafka-to-websocket-clients/build.sbt
index be14757..b576fe8 100644
--- a/pekko-connectors-sample-kafka-to-websocket-clients/build.sbt
+++ b/pekko-connectors-sample-kafka-to-websocket-clients/build.sbt
@@ -1,7 +1,7 @@
 import sbt.Keys._
 
 name := "pekko-connectors-sample-kafka-to-websocket-clients"
-organization := "com.lightbend.akka"
+organization := "org.apache.pekko"
 version := "1.0.0"
 scalaVersion := Dependencies.scalaVer
 libraryDependencies ++= Dependencies.dependencies
diff --git 
a/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala 
b/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
index fe5a4b3..ff7a665 100644
--- 
a/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
+++ 
b/pekko-connectors-sample-kafka-to-websocket-clients/project/Dependencies.scala
@@ -3,26 +3,25 @@ import sbt._
 object Dependencies {
   val scalaVer = "2.13.12"
   // #deps
-  val PekkoVersion = "2.6.19"
-  val PekkoHttpVersion = "10.1.12"
-  val PekkoConnectorsKafkaVersion = "3.0.1"
+  val PekkoVersion = "1.0.2"
+  val PekkoConnectorsKafkaVersion = "1.0.0"
+  val PekkoHttpVersion = "1.0.0"
+
   // #deps
   val dependencies = List(
     // #deps
-    "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
-    "com.typesafe.akka" %% "akka-http" % PekkoHttpVersion,
-    "com.typesafe.akka" %% "akka-stream-kafka" % PekkoConnectorsKafkaVersion,
+    "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
+    "org.apache.pekko" %% "pekko-connectors-kafka" % 
PekkoConnectorsKafkaVersion,
 
     // Logging
-    "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
     "ch.qos.logback" % "logback-classic" % "1.2.13",
     // #deps
 
-    "org.testcontainers" % "kafka" % "1.14.3",
-
-    "com.typesafe.akka" %% "akka-stream-testkit" % PekkoVersion % Test,
+    "org.testcontainers" % "kafka" % "1.17.6",
+    "org.apache.pekko" %% "pekko-stream-testkit" % PekkoVersion,
     "com.google.guava" % "guava" % "28.2-jre" % Test,
-    "junit" % "junit" % "4.13" % Test,
-
+    "junit" % "junit" % "4.13.2" % Test
   )
 }
diff --git 
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Helper.java
 
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Helper.java
index ab1dfad..0473515 100644
--- 
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Helper.java
+++ 
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Helper.java
@@ -1,49 +1,51 @@
 package samples.javadsl;
 
-import akka.Done;
-import akka.actor.ActorSystem;
-import akka.kafka.ProducerSettings;
-import akka.kafka.javadsl.Producer;
-import akka.stream.javadsl.Source;
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.kafka.ProducerSettings;
+import org.apache.pekko.kafka.javadsl.Producer;
+import org.apache.pekko.stream.javadsl.Source;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
 
 import java.util.concurrent.CompletionStage;
 
 public class Helper {
 
-    private static final Logger log = LoggerFactory.getLogger(Helper.class);
+  private static final Logger log = LoggerFactory.getLogger(Helper.class);
 
-    private KafkaContainer kafka;
-    public String kafkaBootstrapServers;
+  private KafkaContainer kafka;
+  public String kafkaBootstrapServers;
 
-    public Helper() {
-    }
+  public Helper() {
+  }
 
-    public void startContainers() {
-        kafka = new KafkaContainer("5.1.2"); // contains Kafka 2.1.x
-        kafka.start();
-        kafkaBootstrapServers = kafka.getBootstrapServers();
-    }
+  public void startContainers() {
 
-    public void stopContainers() {
-        kafka.stop();
-    }
+    kafka = new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.1.2")); // 
contains Kafka 2.1.x
+    kafka.start();
+    kafkaBootstrapServers = kafka.getBootstrapServers();
+  }
 
-    CompletionStage<Done> writeToKafka(String topic, String item, ActorSystem 
actorSystem) {
-        ProducerSettings<Integer, String> kafkaProducerSettings =
-                ProducerSettings.create(actorSystem, new IntegerSerializer(), 
new StringSerializer())
-                        .withBootstrapServers(kafkaBootstrapServers);
+  public void stopContainers() {
+    kafka.stop();
+  }
 
-        CompletionStage<Done> producing =
-                Source.single(new ProducerRecord<Integer, String>(topic, item))
-                        .runWith(Producer.plainSink(kafkaProducerSettings), 
actorSystem);
-        producing.thenAccept(s -> log.info("Producing finished"));
-        return producing;
-    }
+  CompletionStage<Done> writeToKafka(String topic, String item, ActorSystem 
actorSystem) {
+    ProducerSettings<Integer, String> kafkaProducerSettings =
+        ProducerSettings.create(actorSystem, new IntegerSerializer(), new 
StringSerializer())
+            .withBootstrapServers(kafkaBootstrapServers);
+
+    CompletionStage<Done> producing =
+        Source.single(new ProducerRecord<Integer, String>(topic, item))
+            .runWith(Producer.plainSink(kafkaProducerSettings), actorSystem);
+    producing.thenAccept(s -> log.info("Producing finished"));
+    return producing;
+  }
 
 }
diff --git 
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Main.java
 
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Main.java
index 7ee974d..c11ada2 100644
--- 
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Main.java
+++ 
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/java/samples/javadsl/Main.java
@@ -5,33 +5,34 @@
 package samples.javadsl;
 
 // #imports
-import akka.Done;
-import akka.NotUsed;
-import akka.actor.ActorSystem;
-import akka.http.javadsl.ConnectHttp;
-import akka.http.javadsl.Http;
-import akka.http.javadsl.ServerBinding;
-import akka.http.javadsl.model.HttpRequest;
-import akka.http.javadsl.model.HttpResponse;
-import akka.http.javadsl.model.ws.Message;
-import akka.http.javadsl.model.ws.TextMessage;
-import akka.http.javadsl.server.AllDirectives;
-import akka.http.javadsl.server.Route;
-import akka.japi.Pair;
-import akka.kafka.ConsumerSettings;
-import akka.kafka.Subscriptions;
-import akka.kafka.javadsl.Consumer;
-import akka.stream.ActorMaterializer;
-import akka.stream.Materializer;
-import akka.stream.OverflowStrategy;
-import akka.stream.SystemMaterializer;
-import akka.stream.javadsl.BroadcastHub;
-import akka.stream.javadsl.Flow;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pekko.Done;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.http.javadsl.ConnectHttp;
+import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.http.javadsl.ServerBinding;
+import org.apache.pekko.http.javadsl.model.HttpRequest;
+import org.apache.pekko.http.javadsl.model.HttpResponse;
+import org.apache.pekko.http.javadsl.model.ws.Message;
+import org.apache.pekko.http.javadsl.model.ws.TextMessage;
+import org.apache.pekko.http.javadsl.server.AllDirectives;
+import org.apache.pekko.http.javadsl.server.Route;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.kafka.ConsumerSettings;
+import org.apache.pekko.kafka.Subscriptions;
+import org.apache.pekko.kafka.javadsl.Consumer;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.stream.OverflowStrategy;
+import org.apache.pekko.stream.SystemMaterializer;
+import org.apache.pekko.stream.javadsl.BroadcastHub;
+import org.apache.pekko.stream.javadsl.Flow;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,95 +43,94 @@ import java.util.concurrent.TimeUnit;
 
 public class Main extends AllDirectives {
 
-    private static final Logger log = LoggerFactory.getLogger(Main.class);
-
-    private final Helper helper;
-    private final String kafkaBootstrapServers;
-
-    private final String topic = "message-topic";
-    private final String groupId = "docs-group";
-
-    private ActorSystem actorSystem;
-    private Materializer materializer;
-
-    public Main(Helper helper) {
-        this.kafkaBootstrapServers = helper.kafkaBootstrapServers;
-        this.helper = helper;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Helper helper = new Helper();
-        helper.startContainers();
-        Main main = new Main(helper);
-        main.run();
-        helper.stopContainers();
-    }
-
-    private void run() throws Exception {
-        actorSystem = ActorSystem.create("KafkaToWebSocket");
-        materializer = SystemMaterializer.get(actorSystem).materializer();
-        Http http = Http.get(actorSystem);
-
-        // #websocket-handler
-        Flow<Message, Message, ?> webSocketHandler =
-            Flow.fromSinkAndSource(
-                Sink.ignore(),
-                topicSource()
-                    // decouple clients from each other: if a client is too 
slow and more than 1000 elements to be sent to
-                    // to the client queue up here, we fail this client
-                    .buffer(1000, OverflowStrategy.fail())
-                    .via(addIndexFlow())
-                    .map(TextMessage::create));
-        // #websocket-handler
-
-        final Flow<HttpRequest, HttpResponse, ?> routeFlow = 
createRoute(webSocketHandler).flow(actorSystem, materializer);
-        final CompletionStage<ServerBinding> binding = 
http.bindAndHandle(routeFlow,
-                ConnectHttp.toHost("localhost", 8081), materializer);
-
-        binding.toCompletableFuture().get(10, TimeUnit.SECONDS);
-
-        System.out.println("Server online at http://localhost:8081/\nPress 
RETURN to stop...");
-        System.in.read(); // let it run until user presses return
-    }
-
-    public Flow<String, String, NotUsed> addIndexFlow() {
-        final Pair<Integer, String> seed = Pair.create(0, "start");
-        return Flow.of(String.class)
-                   .scan(seed, (acc, message) -> {
-                       Integer index = acc.first();
-                       return Pair.create(index + 1, String.format("index: %s, 
message: %s", index, message));
-                   })
-                .filterNot(p -> p == seed)
-                .map(Pair::second);
-    }
-
-    // #routes
-    private Route createRoute(Flow<Message, Message, ?> webSocketHandler) {
-        return concat(
-                path("events", () -> 
handleWebSocketMessages(webSocketHandler)),
-                path("push", () -> parameter("value", v -> {
-                    CompletionStage<Done> written = helper.writeToKafka(topic, 
v, actorSystem);
-                    return onSuccess(written, done -> complete("Ok"));
-                }))
-        );
-    }
-    // #routes
-
-    // #kafka-to-broadcast
-    private Source<String, ?> topicSource() {
-        ConsumerSettings<Integer, String> kafkaConsumerSettings =
+  private static final Logger log = LoggerFactory.getLogger(Main.class);
+
+  private final Helper helper;
+  private final String kafkaBootstrapServers;
+
+  private final String topic = "message-topic";
+  private final String groupId = "docs-group";
+
+  private ActorSystem actorSystem;
+  private Materializer materializer;
+
+  public Main(Helper helper) {
+    this.kafkaBootstrapServers = helper.kafkaBootstrapServers;
+    this.helper = helper;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Helper helper = new Helper();
+    helper.startContainers();
+    Main main = new Main(helper);
+    main.run();
+    helper.stopContainers();
+  }
+
+  private void run() throws Exception {
+    actorSystem = ActorSystem.create("KafkaToWebSocket");
+    materializer = SystemMaterializer.get(actorSystem).materializer();
+    Http http = Http.get(actorSystem);
+
+    // #websocket-handler
+    Flow<Message, Message, ?> webSocketHandler =
+        Flow.fromSinkAndSource(
+            Sink.ignore(),
+            topicSource()
+                // decouple clients from each other: if a client is too slow 
and more than 1000 elements to be sent to
+                // the client queue up here, we fail this client
+                .buffer(1000, OverflowStrategy.fail())
+                .via(addIndexFlow())
+                .map(TextMessage::create));
+    // #websocket-handler
+    final Flow<HttpRequest, HttpResponse, ?> routeFlow = 
createRoute(webSocketHandler).flow(actorSystem, materializer);
+    final CompletionStage<ServerBinding> binding = 
http.bindAndHandle(routeFlow,
+        ConnectHttp.toHost("localhost", 8081), materializer);
+
+    binding.toCompletableFuture().get(10, TimeUnit.SECONDS);
+    System.out.println("Server online at http://localhost:8081/\nPress RETURN 
to stop...");
+    System.in.read(); // let it run until user presses return
+    actorSystem.terminate();
+  }
+
+  public Flow<String, String, NotUsed> addIndexFlow() {
+    final Pair<Integer, String> seed = Pair.create(0, "start");
+    return Flow.of(String.class)
+        .scan(seed, (acc, message) -> {
+          Integer index = acc.first();
+          return Pair.create(index + 1, String.format("index: %s, message: 
%s", index, message));
+        })
+        .filterNot(p -> p == seed)
+        .map(Pair::second);
+  }
+
+  // #routes
+  private Route createRoute(Flow<Message, Message, ?> webSocketHandler) {
+    return concat(
+        path("events", () -> handleWebSocketMessages(webSocketHandler)),
+        path("push", () -> parameter("value", v -> {
+          CompletionStage<Done> written = helper.writeToKafka(topic, v, 
actorSystem);
+          return onSuccess(written, done -> complete("Ok"));
+        }))
+    );
+  }
+  // #routes
+
+  // #kafka-to-broadcast
+  private Source<String, ?> topicSource() {
+    ConsumerSettings<Integer, String> kafkaConsumerSettings =
         ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new 
StringDeserializer())
-                .withBootstrapServers(kafkaBootstrapServers)
-                .withGroupId(groupId)
-                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
-                .withStopTimeout(Duration.ofSeconds(5));
-
-        return
-            Consumer.plainSource(kafkaConsumerSettings, 
Subscriptions.topics(topic))
-                    .map(consumerRecord -> consumerRecord.value())
-                    // using a broadcast hub here, ensures that all websocket 
clients will use the same
-                    // consumer
-                    .runWith(BroadcastHub.of(String.class), materializer);
-    }
-    // #kafka-to-broadcast
+            .withBootstrapServers(kafkaBootstrapServers)
+            .withGroupId(groupId)
+            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+            .withStopTimeout(Duration.ofSeconds(5));
+
+    return
+        Consumer.plainSource(kafkaConsumerSettings, 
Subscriptions.topics(topic))
+            .map(ConsumerRecord::value)
+            // using a broadcast hub here, ensures that all websocket clients 
will use the same
+            // consumer
+            .runWith(BroadcastHub.of(String.class), materializer);
+  }
+  // #kafka-to-broadcast
 }
diff --git 
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/resources/logback.xml
 
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/resources/logback.xml
index 2923e11..3c22058 100644
--- 
a/pekko-connectors-sample-kafka-to-websocket-clients/src/main/resources/logback.xml
+++ 
b/pekko-connectors-sample-kafka-to-websocket-clients/src/main/resources/logback.xml
@@ -7,7 +7,7 @@
         </encoder>
     </appender>
 
-    <logger name="akka" level="WARN"/>
+    <logger name="org.apache.pekko" level="WARN"/>
     <logger name="com.github.dockerjava" level="INFO"/>
     <logger name="org.testcontainers" level="INFO"/>
 
diff --git 
a/pekko-connectors-sample-kafka-to-websocket-clients/src/test/java/samples/javadsl/MainTest.java
 
b/pekko-connectors-sample-kafka-to-websocket-clients/src/test/java/samples/javadsl/MainTest.java
index 18847cc..0d7c49c 100644
--- 
a/pekko-connectors-sample-kafka-to-websocket-clients/src/test/java/samples/javadsl/MainTest.java
+++ 
b/pekko-connectors-sample-kafka-to-websocket-clients/src/test/java/samples/javadsl/MainTest.java
@@ -1,14 +1,15 @@
 package samples.javadsl;
 
-import akka.NotUsed;
-import akka.actor.ActorSystem;
-import akka.japi.Pair;
-import akka.stream.Materializer;
-import akka.stream.javadsl.Flow;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
-import akka.testkit.javadsl.TestKit;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.collect.Streams;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.stream.javadsl.Flow;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.testkit.javadsl.TestKit;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -20,54 +21,53 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.junit.Assert.assertTrue;
-
 public class MainTest {
-    static ActorSystem system;
 
-    @BeforeClass
-    public static void setup() {
-        system = ActorSystem.create("WebsocketExampleMainTest");
-    }
+  static ActorSystem system;
+
+  @BeforeClass
+  public static void setup() {
+    system = ActorSystem.create("WebsocketExampleMainTest");
+  }
 
-    @AfterClass
-    public static void tearDown() {
-        TestKit.shutdownActorSystem(system);
-        system = null;
-    }
+  @AfterClass
+  public static void tearDown() {
+    TestKit.shutdownActorSystem(system);
+    system = null;
+  }
 
-    @Test
-    public void addIndexFlow_Test() throws Exception {
-        final Main example = new Main(new Helper());
+  @Test
+  public void addIndexFlow_Test() throws Exception {
+    final Main example = new Main(new Helper());
 
-        List<String> messages = Arrays.asList(
-                "I say high, you say low",
-                "You say why and I say I don't know",
-                "Oh, no",
-                "You say goodbye and I say hello"
-        );
+    List<String> messages = Arrays.asList(
+        "I say high, you say low",
+        "You say why and I say I don't know",
+        "Oh, no",
+        "You say goodbye and I say hello"
+    );
 
-        final Flow<String, String, NotUsed> addIndexFlow = 
example.addIndexFlow();
+    final Flow<String, String, NotUsed> addIndexFlow = example.addIndexFlow();
 
-        final CompletionStage<List<String>> future = Source.from(messages)
-                .via(addIndexFlow)
-                .runWith(Sink.seq(), system);
-        final List<String> result = future.toCompletableFuture().get(3, 
TimeUnit.SECONDS);
+    final CompletionStage<List<String>> future = Source.from(messages)
+        .via(addIndexFlow)
+        .runWith(Sink.seq(), system);
+    final List<String> result = future.toCompletableFuture().get(3, 
TimeUnit.SECONDS);
 
-        assert(result.size() == messages.size());
+    assert (result.size() == messages.size());
 
-        final Pattern pattern = Pattern.compile("index: \\d+, message: (.*)");
-        Streams.zip(
-                messages.stream(),
-                result.stream(),
-                Pair::create)
-                .forEachOrdered(pair -> {
-                    String message = pair.first();
-                    String resultMessage = pair.second();
-                    Matcher matcher = pattern.matcher(resultMessage);
-                    assertTrue(matcher.find());
-                    assert(matcher.groupCount() == 1);
-                    assert(matcher.group(1).equals(message));
-                });
-    }
+    final Pattern pattern = Pattern.compile("index: \\d+, message: (.*)");
+    Streams.zip(
+            messages.stream(),
+            result.stream(),
+            Pair::create)
+        .forEachOrdered(pair -> {
+          String message = pair.first();
+          String resultMessage = pair.second();
+          Matcher matcher = pattern.matcher(resultMessage);
+          assertTrue(matcher.find());
+          assert (matcher.groupCount() == 1);
+          assert (matcher.group(1).equals(message));
+        });
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to