This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 7deed41917 Documentation for typed EventStream (#1195)
7deed41917 is described below
commit 7deed4191786a217adcd03221d3c0e81b5594175
Author: Syed Ali <[email protected]>
AuthorDate: Thu Apr 25 06:07:17 2024 -0400
Documentation for typed EventStream (#1195)
* Fixes #1162: Documentation for typed event stream
* Fixes #1162: Added standard apache headers
* Fixes #1162: fix compilation errors
* Fixes #1162: formatting java source
* Fixes #1162: missing subscription and no need to subscribe to actor
* Fixes #1162: fix compilation for JDK9
* Fixes #1162: fix compilation for JDK9
* Fixes #1162: fix compilation for Scala3
---
.../typed/eventstream/EventStreamDocTest.java | 96 +++++++++++++++
.../eventstream/EventStreamSuperClassDocTest.java | 135 +++++++++++++++++++++
.../typed/eventstream/EventStreamDocSpec.scala | 115 ++++++++++++++++++
docs/src/main/paradox/index-utilities.md | 1 +
docs/src/main/paradox/typed/event-stream.md | 58 +++++++++
5 files changed, 405 insertions(+)
diff --git
a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamDocTest.java
b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamDocTest.java
new file mode 100644
index 0000000000..24326de0aa
--- /dev/null
+++
b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamDocTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.actor.typed.eventstream;
+
+import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit;
+import org.apache.pekko.actor.typed.Behavior;
+import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
+import org.apache.pekko.actor.typed.javadsl.ActorContext;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.actor.typed.javadsl.Receive;
+import org.junit.Test;
+import org.scalatestplus.junit.JUnitSuite;
+// #dead-letter-imports
+import org.apache.pekko.actor.DeadLetter;
+import org.apache.pekko.actor.typed.ActorRef;
+import org.apache.pekko.actor.typed.ActorSystem;
+// #dead-letter-imports
+
+public class EventStreamDocTest extends JUnitSuite {
+
+ @Test
+ public void listenToDeadLetters() {
+ // #subscribe-to-dead-letters
+ ActorSystem<Command> system =
+ ActorSystem.create(DeadLetterListenerBehavior.create(),
"DeadLetterListener");
+ // #subscribe-to-dead-letters
+ ActorTestKit.shutdown(system);
+ }
+
+ // #listen-to-dead-letters
+ interface Command {}
+
+ static final class DeadLetterWrapper implements Command {
+ private final DeadLetter deadLetter;
+
+ public DeadLetterWrapper(DeadLetter deadLetter) {
+ this.deadLetter = deadLetter;
+ }
+
+ public DeadLetter getDeadLetter() {
+ return deadLetter;
+ }
+ }
+
+ static class DeadLetterListenerBehavior extends AbstractBehavior<Command> {
+
+ public static Behavior<Command> create() {
+ return Behaviors.setup(DeadLetterListenerBehavior::new);
+ }
+
+ public DeadLetterListenerBehavior(ActorContext<Command> context) {
+ super(context);
+ ActorRef<DeadLetter> messageAdapter =
+ context.messageAdapter(DeadLetter.class, DeadLetterWrapper::new);
+ context
+ .getSystem()
+ .eventStream()
+ .tell(new EventStream.Subscribe<>(DeadLetter.class, messageAdapter));
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(
+ DeadLetterWrapper.class,
+ msg -> {
+ final DeadLetter deadLetter = msg.getDeadLetter();
+ getContext()
+ .getLog()
+ .info(
+ "Dead letter received from sender ({}) to recipient
({}) with message: {}",
+ deadLetter.sender().path().name(),
+ deadLetter.recipient().path().name(),
+ deadLetter.message().toString());
+ return Behaviors.same();
+ })
+ .build();
+ }
+ }
+ // #listen-to-dead-letters
+}
diff --git
a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamSuperClassDocTest.java
b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamSuperClassDocTest.java
new file mode 100644
index 0000000000..92bdc4e8a8
--- /dev/null
+++
b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamSuperClassDocTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.actor.typed.eventstream;
+
+import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit;
+import org.apache.pekko.actor.typed.ActorRef;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.Behavior;
+import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
+import org.apache.pekko.actor.typed.javadsl.ActorContext;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.actor.typed.javadsl.Receive;
+import org.junit.Test;
+import org.scalatestplus.junit.JUnitSuite;
+
+// #listen-to-super-class-imports
+import org.apache.pekko.actor.DeadLetter;
+import org.apache.pekko.actor.AllDeadLetters;
+import org.apache.pekko.actor.Dropped;
+import org.apache.pekko.actor.SuppressedDeadLetter;
+import org.apache.pekko.actor.UnhandledMessage;
+// #listen-to-super-class-imports
+
+public class EventStreamSuperClassDocTest extends JUnitSuite {
+
+ @Test
+ public void listenToDeadLetters() {
+ // #subscribe-to-super-class
+ ActorSystem<Command> system =
+ ActorSystem.create(AllDeadLettersListenerBehavior.create(),
"AllDeadLettersListener");
+ // #subscribe-to-super-class
+ ActorTestKit.shutdown(system);
+ }
+
+ // #listen-to-super-class
+ interface Command {}
+
+ static final class AllDeadLettersWrapper implements Command {
+ private final AllDeadLetters allDeadLetters;
+
+ public AllDeadLettersWrapper(AllDeadLetters deadLetter) {
+ this.allDeadLetters = deadLetter;
+ }
+
+ public AllDeadLetters getAllDeadLetters() {
+ return allDeadLetters;
+ }
+ }
+
+ static class AllDeadLettersListenerBehavior extends
AbstractBehavior<Command> {
+
+ public static Behavior<Command> create() {
+ return Behaviors.setup(AllDeadLettersListenerBehavior::new);
+ }
+
+ public AllDeadLettersListenerBehavior(ActorContext<Command> context) {
+ super(context);
+ ActorRef<AllDeadLetters> messageAdapter =
+ context.messageAdapter(
+ AllDeadLetters.class,
EventStreamSuperClassDocTest.AllDeadLettersWrapper::new);
+ context
+ .getSystem()
+ .eventStream()
+ .tell(new EventStream.Subscribe<>(AllDeadLetters.class,
messageAdapter));
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(
+ AllDeadLettersWrapper.class,
+ msg -> {
+ final AllDeadLetters allDeadLetters = msg.getAllDeadLetters();
+ final Class<? extends AllDeadLetters> klass =
msg.allDeadLetters.getClass();
+ if (klass.isAssignableFrom(DeadLetter.class)) {
+ final DeadLetter deadLetter = (DeadLetter) allDeadLetters;
+ getContext()
+ .getLog()
+ .info(
+ "DeadLetter: sender ({}) to recipient ({}) with
message: {}",
+ deadLetter.sender().path().name(),
+ deadLetter.recipient().path().name(),
+ deadLetter.message());
+ } else if (klass.isAssignableFrom(Dropped.class)) {
+ final Dropped dropped = (Dropped) allDeadLetters;
+ getContext()
+ .getLog()
+ .info(
+ "Dropped: sender ({}) to recipient ({}) with
message: {}, reason: {}",
+ dropped.sender().path().name(),
+ dropped.recipient().path().name(),
+ dropped.message(),
+ dropped.reason());
+ } else if (klass.isAssignableFrom(SuppressedDeadLetter.class))
{
+ final SuppressedDeadLetter suppressedDeadLetter =
+ (SuppressedDeadLetter) allDeadLetters;
+ getContext()
+ .getLog()
+ .trace(
+ "SuppressedDeadLetter: sender ({}) to recipient ({})
with message: {}",
+ suppressedDeadLetter.sender().path().name(),
+ suppressedDeadLetter.recipient().path().name(),
+ suppressedDeadLetter.message());
+ } else if (klass.isAssignableFrom(UnhandledMessage.class)) {
+ final UnhandledMessage unhandledMessage = (UnhandledMessage)
allDeadLetters;
+ getContext()
+ .getLog()
+ .info(
+ "UnhandledMessage: sender ({}) to recipient ({})
with message: {}",
+ unhandledMessage.sender().path().name(),
+ unhandledMessage.recipient().path().name(),
+ unhandledMessage.message());
+ }
+ return Behaviors.same();
+ })
+ .build();
+ }
+ }
+ // #listen-to-super-class
+}
diff --git
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/eventstream/EventStreamDocSpec.scala
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/eventstream/EventStreamDocSpec.scala
new file mode 100644
index 0000000000..b47c81769f
--- /dev/null
+++
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/eventstream/EventStreamDocSpec.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.actor.typed.eventstream
+
+import org.apache.pekko.actor.{ AllDeadLetters, DeadLetter, Dropped,
SuppressedDeadLetter, UnhandledMessage }
+import org.apache.pekko.actor.testkit.typed.scaladsl.{ LogCapturing,
ScalaTestWithActorTestKit }
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object EventStreamDocSpec {
+
+ // #listen-to-dead-letters
+ import org.apache.pekko.actor.typed.Behavior
+ import org.apache.pekko.actor.typed.eventstream.EventStream.Subscribe
+ import org.apache.pekko.actor.typed.scaladsl.Behaviors
+
+ object DeadLetterListener {
+ sealed trait Command
+ final case class DeadLetterWrapper(deadLetter: DeadLetter) extends Command
+
+ def apply(): Behavior[Command] = {
+ Behaviors.setup[Command] {
+ context =>
+ val adapter =
context.messageAdapter[DeadLetter](DeadLetterWrapper.apply)
+ context.system.eventStream ! Subscribe(adapter)
+
+ Behaviors.receiveMessage {
+ case DeadLetterWrapper(DeadLetter(message, sender, recipient)) =>
+ context.log.info("Dead letter received from sender ({}) to
recipient ({}) with message: {}",
+ sender.path.name, recipient.path.name, message.toString)
+ Behaviors.same
+ }
+ }
+ }
+ }
+ // #listen-to-dead-letters
+
+ // #listen-to-super-class
+ object AllDeadLetterListener {
+ sealed trait Command
+ final case class AllDeadLettersWrapper(allDeadLetters: AllDeadLetters)
extends Command
+
+ def apply(): Behavior[Command] = {
+ Behaviors.setup[Command] {
+ context =>
+ val adapter =
context.messageAdapter[AllDeadLetters](AllDeadLettersWrapper.apply)
+ context.system.eventStream ! Subscribe(adapter)
+
+ Behaviors.receiveMessage {
+ case AllDeadLettersWrapper(allDeadLetters) =>
+ allDeadLetters match {
+ case DeadLetter(message, sender, recipient) =>
+ context.log.info("DeadLetter received from sender ({}) to
recipient ({}) with message: {}",
+ sender.path.name, recipient.path.name, message.toString)
+
+ case Dropped(message, reason, sender, recipient) =>
+ context.log.info("Dropped: sender ({}) to recipient ({})
with message: {}, reason: {}",
+ sender.path.name, recipient.path.name, message.toString,
reason)
+
+ case SuppressedDeadLetter(message, sender, recipient) =>
+ // use trace otherwise logs will be flooded
+ context.log.trace("SuppressedDeadLetter received from sender
({}) to recipient ({}) with message: {}",
+ sender.path.name, recipient.path.name, message)
+
+ case UnhandledMessage(message, sender, recipient) =>
+ context.log.info("UnhandledMessage received from sender ({})
to recipient ({}) with message: {}",
+ sender.path.name, recipient.path.name, message.toString)
+ }
+ Behaviors.same
+ }
+ }
+ }
+ }
+ // #listen-to-super-class
+}
+
+class EventStreamDocSpec extends ScalaTestWithActorTestKit with
AnyWordSpecLike with LogCapturing {
+ import EventStreamDocSpec._
+
+ "listen to dead letters" in {
+ // #listen-to-dead-letters
+
+ ActorSystem(Behaviors.setup[Void] { context =>
+ context.spawn(DeadLetterListener(), "DeadLetterListener")
+ Behaviors.empty
+ }, "DeadLetterListenerSystem")
+ // #listen-to-dead-letters
+ }
+
+ "listen to all dead letters" in {
+ // #listen-to-super-class
+
+ ActorSystem(Behaviors.setup[Void] { context =>
+ context.spawn(AllDeadLetterListener(), "AllDeadLetterListener")
+ Behaviors.empty
+ }, "AllDeadLetterListenerSystem")
+ // #listen-to-super-class
+ }
+}
diff --git a/docs/src/main/paradox/index-utilities.md
b/docs/src/main/paradox/index-utilities.md
index 9f52f8d40d..0935fb10aa 100644
--- a/docs/src/main/paradox/index-utilities.md
+++ b/docs/src/main/paradox/index-utilities.md
@@ -4,6 +4,7 @@
@@@ index
+* [event-stream](typed/event-stream.md)
* [logging](typed/logging.md)
* [common/circuitbreaker](common/circuitbreaker.md)
* [futures](futures.md)
diff --git a/docs/src/main/paradox/typed/event-stream.md
b/docs/src/main/paradox/typed/event-stream.md
new file mode 100644
index 0000000000..c74abf413d
--- /dev/null
+++ b/docs/src/main/paradox/typed/event-stream.md
@@ -0,0 +1,58 @@
+# Event Stream
+
+You are viewing the documentation for the new actor APIs, to view the Pekko
Classic documentation, see @ref:[Classic Event
Stream](../event-bus.md#event-stream).
+
+## Dependency
+
+To use Event Stream, you must have Pekko typed actors dependency in your
project.
+
+@@dependency[sbt,Maven,Gradle] {
+bomGroup=org.apache.pekko bomArtifact=pekko-bom_$scala.binary.version$
bomVersionSymbols=PekkoVersion
+symbol1=PekkoVersion
+value1="$pekko.version$"
+group="org.apache.pekko"
+artifact="pekko-actor-typed_$scala.binary.version$"
+version=PekkoVersion
+}
+
+## How to use
+
+The following example demonstrates how a subscription works. Given an actor:
+
+Scala
+: @@snip
[EventStreamDocSpec.scala](/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/eventstream/EventStreamDocSpec.scala)
{ #listen-to-dead-letters }
+
+Java
+: @@snip
[EventStreamDocTest.java](/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamDocTest.java)
{ #dead-letter-imports }
+
+@@@ div { .group-java }
+
+The actor definition:
+
+@@snip
[EventStreamDocTest.java](/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamDocTest.java)
{ #listen-to-dead-letters }
+
+It can be used as follows:
+
+@@snip
[EventStreamDocTest.java](/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamDocTest.java)
{ #subscribe-to-dead-letters }
+
+@@@
+
+It is possible to subscribe to common superclass:
+
+Scala
+: @@snip
[EventStreamDocSpec.scala](/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/eventstream/EventStreamDocSpec.scala)
{ #listen-to-super-class }
+
+Java
+: @@snip
[EventStreamSuperClassDocTest.java](/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamSuperClassDocTest.java)
{ #listen-to-super-class-imports }
+
+@@@ div { .group-java }
+
+The actor definition:
+
+@@snip
[EventStreamSuperClassDocTest.java](/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamSuperClassDocTest.java)
{ #listen-to-super-class }
+
+It can be used as follows:
+
+@@snip
[EventStreamSuperClassDocTest.java](/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/eventstream/EventStreamSuperClassDocTest.java)
{ #subscribe-to-super-class }
+
+@@@
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]