This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 8adb72ad2 Slick: remove need to cast system.dispatcher (#1535)
8adb72ad2 is described below
commit 8adb72ad24a00764da5df6b6d119f04fa8a62091
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Mar 28 12:37:54 2026 +0100
Slick: remove need to cast system.dispatcher (#1535)
---
.../stream/connectors/slick/javadsl/Slick.scala | 127 +++++++++++++++++++--
.../javadsl/DocSnippetFlowWithPassThrough.java | 3 +-
slick/src/test/java/docs/javadsl/SlickTest.java | 11 +-
3 files changed, 126 insertions(+), 15 deletions(-)
diff --git
a/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
b/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
index f1fb8104c..57128e8cd 100644
---
a/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
+++
b/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
@@ -13,23 +13,20 @@
package org.apache.pekko.stream.connectors.slick.javadsl
-import java.sql.Connection
-import java.sql.PreparedStatement
-import java.util.concurrent.CompletionStage
-import java.util.concurrent.Executor
+import java.sql.{ Connection, PreparedStatement }
+import java.util.concurrent.{ CompletionStage, Executor }
import java.util.function.{ Function => JFunction }
import java.util.function.{ BiFunction => JBiFunction }
import org.apache.pekko
-import pekko.Done
-import pekko.NotUsed
+import pekko.{ Done, NotUsed }
import pekko.japi.function.Function2
import pekko.stream.connectors.slick.scaladsl.{ Slick => ScalaSlick }
import pekko.stream.javadsl._
import slick.dbio.DBIO
import slick.jdbc.{ GetResult, SQLActionBuilder, SetParameter,
SimpleJdbcAction }
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import scala.jdk.FunctionConverters._
import scala.jdk.FutureConverters._
@@ -188,6 +185,29 @@ object Slick {
mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
flowWithPassThrough(session, executor, 1, toStatement, mapper)
+ /**
+ * Java API: creates a Flow that takes a stream of elements of
+ * type T, transforms each element to a SQL statement
+ * using the specified function, then executes
+ * those statements against the specified Slick database
+ * and allows to combine the statement result and element into a
result type R.
+ *
+ * @param session The database session to use.
+ * @param executor Executor used to run mapper function in.
+ * E.g. the dispatcher of the ActorSystem.
+ * @param toStatement A function that creates the SQL statement to
+ * execute from the current element. Any DML or
+ * DDL statement is acceptable.
+ * @param mapper A function to create a result from the incoming element T
+ * and the database statement result.
+ */
+ def flowWithPassThrough[T, R](
+ session: SlickSession,
+ executor: ExecutionContextExecutor,
+ toStatement: JFunction[T, String],
+ mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
+ flowWithPassThrough(session, executor, 1, toStatement, mapper)
+
/**
* Java API: creates a Flow that takes a stream of elements of
* type T, transforms each element to a SQL statement
@@ -237,6 +257,29 @@ object Slick {
mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
flowWithPassThrough(session, executor, 1, toStatement, mapper)
+ /**
+ * Java API: creates a Flow that takes a stream of elements of
+ * type T, transforms each element to a SQL statement
+ * using the specified function, then executes
+ * those statements against the specified Slick database
+ * and allows to combine the statement result and element into a
result type R.
+ *
+ * @param session The database session to use.
+ * @param executor Executor used to run mapper function in.
+ * E.g. the dispatcher of the ActorSystem.
+ * @param toStatement A function that creates the SQL statement to
+ * execute from the current element. Any DML or
+ * DDL statement is acceptable.
+ * @param mapper A function to create a result from the incoming element T
+ * and the database statement result.
+ */
+ def flowWithPassThrough[T, R](
+ session: SlickSession,
+ executor: ExecutionContextExecutor,
+ toStatement: Function2[T, Connection, PreparedStatement],
+ mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
+ flowWithPassThrough(session, executor, 1, toStatement, mapper)
+
/**
* Java API: creates a Flow that takes a stream of elements of
* type T, transforms each element to a SQL statement
@@ -310,6 +353,41 @@ object Slick {
.asJava
}
+ /**
+ * Java API: creates a Flow that takes a stream of elements of
+ * type T, transforms each element to a SQL statement
+ * using the specified function, then executes
+ * those statements against the specified Slick database
+ * and allows to combine the statement result and element into a
result type R.
+ *
+ * @param session The database session to use.
+ * @param executor Executor used to run mapper function in.
+ * E.g. the dispatcher of the ActorSystem.
+ * @param parallelism How many parallel asynchronous streams should be
+ * used to send statements to the database. Use a
+ * value of 1 for sequential execution.
+ * @param toStatement A function that creates the SQL statement to
+ * execute from the current element. Any DML or
+ * DDL statement is acceptable.
+ * @param mapper A function to create a result from the incoming element T
+ * and the database statement result.
+ */
+ def flowWithPassThrough[T, R](
+ session: SlickSession,
+ executor: ExecutionContextExecutor,
+ parallelism: Int,
+ toStatement: JFunction[T, String],
+ mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] = {
+ ScalaSlick
+ .flowWithPassThrough[T, R](parallelism,
+ (t: T) => {
+ toDBIO(toStatement)
+ .apply(t)
+ .map(count => mapper.apply(t, count))(executor)
+ })(session)
+ .asJava
+ }
+
/**
* Java API: creates a Flow that takes a stream of elements of
* type T, transforms each element to a SQL statement
@@ -383,6 +461,41 @@ object Slick {
.asJava
}
+ /**
+ * Java API: creates a Flow that takes a stream of elements of
+ * type T, transforms each element to a SQL statement
+ * using the specified function, then executes
+ * those statements against the specified Slick database
+ * and allows to combine the statement result and element into a
result type R.
+ *
+ * @param session The database session to use.
+ * @param executor Executor used to run mapper function in.
+ * E.g. the dispatcher of the ActorSystem.
+ * @param parallelism How many parallel asynchronous streams should be
+ * used to send statements to the database. Use a
+ * value of 1 for sequential execution.
+ * @param toStatement A function that creates the SQL statement to
+ * execute from the current element. Any DML or
+ * DDL statement is acceptable.
+ * @param mapper A function to create a result from the incoming element T
+ * and the database statement result.
+ */
+ def flowWithPassThrough[T, R](
+ session: SlickSession,
+ executor: ExecutionContextExecutor,
+ parallelism: Int,
+ toStatement: Function2[T, Connection, PreparedStatement],
+ mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] = {
+ ScalaSlick
+ .flowWithPassThrough[T, R](parallelism,
+ (t: T) => {
+ toDBIO(toStatement)
+ .apply(t)
+ .map(count => mapper.apply(t, count))(executor)
+ })(session)
+ .asJava
+ }
+
/**
* Java API: creates a Sink that takes a stream of elements of
* type T, transforms each element to a SQL statement
diff --git
a/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
b/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
index 8a24c6259..131d50ee6 100644
--- a/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
+++ b/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
@@ -25,7 +25,6 @@ import java.sql.PreparedStatement;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -85,7 +84,7 @@ public class DocSnippetFlowWithPassThrough {
.via(
Slick.flowWithPassThrough(
session,
- (Executor) system.dispatcher(),
+ system.dispatcher(),
// add an optional second argument to specify the
parallelism factor (int)
(kafkaMessage, connection) -> {
PreparedStatement statement =
diff --git a/slick/src/test/java/docs/javadsl/SlickTest.java
b/slick/src/test/java/docs/javadsl/SlickTest.java
index ad40ce501..e11e34d05 100644
--- a/slick/src/test/java/docs/javadsl/SlickTest.java
+++ b/slick/src/test/java/docs/javadsl/SlickTest.java
@@ -41,7 +41,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -244,7 +243,7 @@ public class SlickTest {
@Test
public void testFlowWithPassThroughWithoutParallelismAndReadBackWithSource()
throws Exception {
final Flow<User, User, NotUsed> slickFlow =
- Slick.flowWithPassThrough(session, (Executor) system.dispatcher(),
insertUser, (user, i) -> user);
+ Slick.flowWithPassThrough(session, system.dispatcher(), insertUser,
(user, i) -> user);
final List<User> insertedUsers =
usersSource
.via(slickFlow)
@@ -259,7 +258,7 @@ public class SlickTest {
@Test
public void
testFlowPSWithPassThroughWithoutParallelismAndReadBackWithSource() throws
Exception {
final Flow<User, User, NotUsed> slickFlow =
- Slick.flowWithPassThrough(session, (Executor) system.dispatcher(),
insertUserPS, (user, i) -> user);
+ Slick.flowWithPassThrough(session, system.dispatcher(), insertUserPS,
(user, i) -> user);
final List<User> insertedUsers =
usersSource
.via(slickFlow)
@@ -274,7 +273,7 @@ public class SlickTest {
@Test
public void testFlowWithPassThroughWithParallelismOf4AndReadBackWithSource()
throws Exception {
final Flow<User, User, NotUsed> slickFlow =
- Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 4,
insertUser, (user, i) -> user);
+ Slick.flowWithPassThrough(session, system.dispatcher(), 4, insertUser,
(user, i) -> user);
final List<User> insertedUsers =
usersSource
.via(slickFlow)
@@ -289,7 +288,7 @@ public class SlickTest {
@Test
public void
testFlowPSWithPassThroughWithParallelismOf4AndReadBackWithSource() throws
Exception {
final Flow<User, User, NotUsed> slickFlow =
- Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 4,
insertUserPS, (user, i) -> user);
+ Slick.flowWithPassThrough(session, system.dispatcher(), 4,
insertUserPS, (user, i) -> user);
final List<User> insertedUsers =
usersSource
.via(slickFlow)
@@ -326,7 +325,7 @@ public class SlickTest {
.via(
Slick.flowWithPassThrough(
session,
- (Executor) system.dispatcher(),
+ system.dispatcher(),
insertUserInKafkaMessage,
(kafkaMessage, insertCount) -> kafkaMessage.map(user ->
insertCount)))
.mapAsync(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]