This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 8adb72ad2 Slick: remove need to cast system.dispatcher (#1535)
8adb72ad2 is described below

commit 8adb72ad24a00764da5df6b6d119f04fa8a62091
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Mar 28 12:37:54 2026 +0100

    Slick: remove need to cast system.dispatcher (#1535)
---
 .../stream/connectors/slick/javadsl/Slick.scala    | 127 +++++++++++++++++++--
 .../javadsl/DocSnippetFlowWithPassThrough.java     |   3 +-
 slick/src/test/java/docs/javadsl/SlickTest.java    |  11 +-
 3 files changed, 126 insertions(+), 15 deletions(-)

diff --git 
a/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
 
b/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
index f1fb8104c..57128e8cd 100644
--- 
a/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
+++ 
b/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
@@ -13,23 +13,20 @@
 
 package org.apache.pekko.stream.connectors.slick.javadsl
 
-import java.sql.Connection
-import java.sql.PreparedStatement
-import java.util.concurrent.CompletionStage
-import java.util.concurrent.Executor
+import java.sql.{ Connection, PreparedStatement }
+import java.util.concurrent.{ CompletionStage, Executor }
 import java.util.function.{ Function => JFunction }
 import java.util.function.{ BiFunction => JBiFunction }
 
 import org.apache.pekko
-import pekko.Done
-import pekko.NotUsed
+import pekko.{ Done, NotUsed }
 import pekko.japi.function.Function2
 import pekko.stream.connectors.slick.scaladsl.{ Slick => ScalaSlick }
 import pekko.stream.javadsl._
 import slick.dbio.DBIO
 import slick.jdbc.{ GetResult, SQLActionBuilder, SetParameter, 
SimpleJdbcAction }
 
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
 import scala.jdk.FunctionConverters._
 import scala.jdk.FutureConverters._
 
@@ -188,6 +185,29 @@ object Slick {
       mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
     flowWithPassThrough(session, executor, 1, toStatement, mapper)
 
+  /**
+   * Java API: creates a Flow that takes a stream of elements of
+   *           type T, transforms each element to a SQL statement
+   *           using the specified function, then executes
+   *           those statements against the specified Slick database
+   *           and allows to combine the statement result and element into a 
result type R.
+   *
+   * @param session The database session to use.
+   * @param executor Executor used to run mapper function in.
+   *                 E.g. the dispatcher of the ActorSystem.
+   * @param toStatement A function that creates the SQL statement to
+   *                    execute from the current element. Any DML or
+   *                    DDL statement is acceptable.
+   * @param mapper A function to create a result from the incoming element T
+   *               and the database statement result.
+   */
+  def flowWithPassThrough[T, R](
+      session: SlickSession,
+      executor: ExecutionContextExecutor,
+      toStatement: JFunction[T, String],
+      mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
+    flowWithPassThrough(session, executor, 1, toStatement, mapper)
+
   /**
    * Java API: creates a Flow that takes a stream of elements of
    *           type T, transforms each element to a SQL statement
@@ -237,6 +257,29 @@ object Slick {
       mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
     flowWithPassThrough(session, executor, 1, toStatement, mapper)
 
+  /**
+   * Java API: creates a Flow that takes a stream of elements of
+   *           type T, transforms each element to a SQL statement
+   *           using the specified function, then executes
+   *           those statements against the specified Slick database
+   *           and allows to combine the statement result and element into a 
result type R.
+   *
+   * @param session The database session to use.
+   * @param executor Executor used to run mapper function in.
+   *                 E.g. the dispatcher of the ActorSystem.
+   * @param toStatement A function that creates the SQL statement to
+   *                    execute from the current element. Any DML or
+   *                    DDL statement is acceptable.
+   * @param mapper A function to create a result from the incoming element T
+   *               and the database statement result.
+   */
+  def flowWithPassThrough[T, R](
+      session: SlickSession,
+      executor: ExecutionContextExecutor,
+      toStatement: Function2[T, Connection, PreparedStatement],
+      mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
+    flowWithPassThrough(session, executor, 1, toStatement, mapper)
+
   /**
    * Java API: creates a Flow that takes a stream of elements of
    *           type T, transforms each element to a SQL statement
@@ -310,6 +353,41 @@ object Slick {
       .asJava
   }
 
+  /**
+   * Java API: creates a Flow that takes a stream of elements of
+   *           type T, transforms each element to a SQL statement
+   *           using the specified function, then executes
+   *           those statements against the specified Slick database
+   *           and allows to combine the statement result and element into a 
result type R.
+   *
+   * @param session The database session to use.
+   * @param executor Executor used to run mapper function in.
+   *                 E.g. the dispatcher of the ActorSystem.
+   * @param parallelism How many parallel asynchronous streams should be
+   *                    used to send statements to the database. Use a
+   *                    value of 1 for sequential execution.
+   * @param toStatement A function that creates the SQL statement to
+   *                    execute from the current element. Any DML or
+   *                    DDL statement is acceptable.
+   * @param mapper A function to create a result from the incoming element T
+   *               and the database statement result.
+   */
+  def flowWithPassThrough[T, R](
+      session: SlickSession,
+      executor: ExecutionContextExecutor,
+      parallelism: Int,
+      toStatement: JFunction[T, String],
+      mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] = {
+    ScalaSlick
+      .flowWithPassThrough[T, R](parallelism,
+        (t: T) => {
+          toDBIO(toStatement)
+            .apply(t)
+            .map(count => mapper.apply(t, count))(executor)
+        })(session)
+      .asJava
+  }
+
   /**
    * Java API: creates a Flow that takes a stream of elements of
    *           type T, transforms each element to a SQL statement
@@ -383,6 +461,41 @@ object Slick {
       .asJava
   }
 
+  /**
+   * Java API: creates a Flow that takes a stream of elements of
+   *           type T, transforms each element to a SQL statement
+   *           using the specified function, then executes
+   *           those statements against the specified Slick database
+   *           and allows to combine the statement result and element into a 
result type R.
+   *
+   * @param session The database session to use.
+   * @param executor Executor used to run mapper function in.
+   *                 E.g. the dispatcher of the ActorSystem.
+   * @param parallelism How many parallel asynchronous streams should be
+   *                    used to send statements to the database. Use a
+   *                    value of 1 for sequential execution.
+   * @param toStatement A function that creates the SQL statement to
+   *                    execute from the current element. Any DML or
+   *                    DDL statement is acceptable.
+   * @param mapper A function to create a result from the incoming element T
+   *               and the database statement result.
+   */
+  def flowWithPassThrough[T, R](
+      session: SlickSession,
+      executor: ExecutionContextExecutor,
+      parallelism: Int,
+      toStatement: Function2[T, Connection, PreparedStatement],
+      mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] = {
+    ScalaSlick
+      .flowWithPassThrough[T, R](parallelism,
+        (t: T) => {
+          toDBIO(toStatement)
+            .apply(t)
+            .map(count => mapper.apply(t, count))(executor)
+        })(session)
+      .asJava
+  }
+
   /**
    * Java API: creates a Sink that takes a stream of elements of
    *           type T, transforms each element to a SQL statement
diff --git 
a/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java 
b/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
index 8a24c6259..131d50ee6 100644
--- a/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
+++ b/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
@@ -25,7 +25,6 @@ import java.sql.PreparedStatement;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -85,7 +84,7 @@ public class DocSnippetFlowWithPassThrough {
             .via(
                 Slick.flowWithPassThrough(
                     session,
-                    (Executor) system.dispatcher(),
+                    system.dispatcher(),
                     // add an optional second argument to specify the 
parallelism factor (int)
                     (kafkaMessage, connection) -> {
                       PreparedStatement statement =
diff --git a/slick/src/test/java/docs/javadsl/SlickTest.java 
b/slick/src/test/java/docs/javadsl/SlickTest.java
index ad40ce501..e11e34d05 100644
--- a/slick/src/test/java/docs/javadsl/SlickTest.java
+++ b/slick/src/test/java/docs/javadsl/SlickTest.java
@@ -41,7 +41,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -244,7 +243,7 @@ public class SlickTest {
   @Test
   public void testFlowWithPassThroughWithoutParallelismAndReadBackWithSource() 
throws Exception {
     final Flow<User, User, NotUsed> slickFlow =
-        Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 
insertUser, (user, i) -> user);
+        Slick.flowWithPassThrough(session, system.dispatcher(), insertUser, 
(user, i) -> user);
     final List<User> insertedUsers =
         usersSource
             .via(slickFlow)
@@ -259,7 +258,7 @@ public class SlickTest {
   @Test
   public void 
testFlowPSWithPassThroughWithoutParallelismAndReadBackWithSource() throws 
Exception {
     final Flow<User, User, NotUsed> slickFlow =
-        Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 
insertUserPS, (user, i) -> user);
+        Slick.flowWithPassThrough(session, system.dispatcher(), insertUserPS, 
(user, i) -> user);
     final List<User> insertedUsers =
         usersSource
             .via(slickFlow)
@@ -274,7 +273,7 @@ public class SlickTest {
   @Test
   public void testFlowWithPassThroughWithParallelismOf4AndReadBackWithSource() 
throws Exception {
     final Flow<User, User, NotUsed> slickFlow =
-        Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 4, 
insertUser, (user, i) -> user);
+        Slick.flowWithPassThrough(session, system.dispatcher(), 4, insertUser, 
(user, i) -> user);
     final List<User> insertedUsers =
         usersSource
             .via(slickFlow)
@@ -289,7 +288,7 @@ public class SlickTest {
   @Test
   public void 
testFlowPSWithPassThroughWithParallelismOf4AndReadBackWithSource() throws 
Exception {
     final Flow<User, User, NotUsed> slickFlow =
-        Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 4, 
insertUserPS, (user, i) -> user);
+        Slick.flowWithPassThrough(session, system.dispatcher(), 4, 
insertUserPS, (user, i) -> user);
     final List<User> insertedUsers =
         usersSource
             .via(slickFlow)
@@ -326,7 +325,7 @@ public class SlickTest {
             .via(
                 Slick.flowWithPassThrough(
                     session,
-                    (Executor) system.dispatcher(),
+                    system.dispatcher(),
                     insertUserInKafkaMessage,
                     (kafkaMessage, insertCount) -> kafkaMessage.map(user -> 
insertCount)))
             .mapAsync(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to