This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new b3691c941 Use CompletionStage in Java DSL (#1526)
b3691c941 is described below

commit b3691c94160cd019a2b6ed668a855247ae33aae7
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Mar 26 10:08:40 2026 +0100

    Use CompletionStage in Java DSL (#1526)
    
    * Initial plan
    
    * Fix javadsl API to expose Java types instead of Scala types
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/fee60a5d-da86-496b-8bd7-dd68bf2795a5
    
    * refactor
    
    * Fix createSharded compile error: convert mat value on Source before 
groupBy
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/9c2dbe39-9ebd-4d06-b570-19bd0bfc0ac9
    
    * mima
    
    * mima take 2
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../javadsl.backwards.excludes                     |  26 +++++
 .../couchbase3/javadsl/CouchbaseSink.scala         | 126 ++++++++++++++++++++-
 .../javadsl.backwards.excludes                     |  19 ++++
 .../googlecloud/pubsub/javadsl/GooglePubSub.scala  |  19 +++-
 .../javadsl.backwards.excludes                     |  19 ++++
 .../kinesis/javadsl/KinesisSchedulerSource.scala   |  18 +++
 .../src/test/java/docs/javadsl/KclSnippets.java    |  16 ++-
 7 files changed, 235 insertions(+), 8 deletions(-)

diff --git 
a/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
 
b/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
new file mode 100644
index 000000000..c34b73646
--- /dev/null
+++ 
b/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changed methods to return CompletionStage instead of Scala Future
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.exists")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.remove")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replace")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replaceDoc")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsert")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsertDoc")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insert")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insertDoc")
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
index 30d3b695b..9cc24ff9b 100644
--- 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
@@ -24,69 +24,189 @@ import org.apache.pekko.Done
 import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseSink 
=> ScalaCouchbaseSink }
 import org.apache.pekko.stream.javadsl.Sink
 
+import java.util.concurrent.CompletionStage
 import scala.concurrent.Future
+import scala.jdk.FutureConverters._
 
 object CouchbaseSink {
 
   /**
    * reference to [[CouchbaseFlow.insertDoc]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #insertDocFuture} which works like this method worked in 1.x.
    */
   def insertDoc[T](insertOptions: InsertOptions)(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.insertDoc[T](insertOptions).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.insertDoc]]
+   * @deprecated Use insertDoc which returns CompletionStage instead
+   */
+  @deprecated("Use insertDoc which returns CompletionStage instead", since = 
"2.0.0")
+  def insertDocFuture[T](insertOptions: InsertOptions)(
       implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
     ScalaCouchbaseSink.insertDoc[T](insertOptions).asJava
 
   /**
-   * reference to [[CouchbaseFlow.upsertDoc]]
+   * reference to [[CouchbaseFlow.insert]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #insertFuture} which works like this method worked in 1.x.
    */
   def insert[T](applyId: T => String,
+      insertOptions: InsertOptions)(
+      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[Done]] =
+    ScalaCouchbaseSink.insert[T](applyId, 
insertOptions).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.insert]]
+   * @deprecated Use insert which returns CompletionStage instead
+   */
+  @deprecated("Use insert which returns CompletionStage instead", since = 
"2.0.0")
+  def insertFuture[T](applyId: T => String,
       insertOptions: InsertOptions)(
       implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
     ScalaCouchbaseSink.insert[T](applyId, insertOptions).asJava
 
   /**
    * reference to [[CouchbaseFlow.upsertDoc]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #upsertDocFuture} which works like this method worked in 1.x.
    */
   def upsertDoc[T](upsertOptions: UpsertOptions = 
UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.upsertDoc[T](upsertOptions).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsertDoc]]
+   * @deprecated Use upsertDoc which returns CompletionStage instead
+   */
+  @deprecated("Use upsertDoc which returns CompletionStage instead", since = 
"2.0.0")
+  def upsertDocFuture[T](upsertOptions: UpsertOptions = 
UpsertOptions.upsertOptions())(
       implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
     ScalaCouchbaseSink.upsertDoc[T](upsertOptions).asJava
 
   /**
    * reference to [[CouchbaseFlow.upsert]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #upsertFuture} which works like this method worked in 1.x.
    */
   def upsert[T](applyId: T => String,
+      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[Done]] =
+    ScalaCouchbaseSink.upsert[T](applyId, 
upsertOptions).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsert]]
+   * @deprecated Use upsert which returns CompletionStage instead
+   */
+  @deprecated("Use upsert which returns CompletionStage instead", since = 
"2.0.0")
+  def upsertFuture[T](applyId: T => String,
       upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
       implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
     ScalaCouchbaseSink.upsert[T](applyId, upsertOptions).asJava
 
   /**
    * reference to [[CouchbaseFlow.replaceDoc]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #replaceDocFuture} which works like this method worked in 1.x.
    */
   def replaceDoc[T](
+      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.replaceDoc[T](replaceOptions).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.replaceDoc]]
+   * @deprecated Use replaceDoc which returns CompletionStage instead
+   */
+  @deprecated("Use replaceDoc which returns CompletionStage instead", since = 
"2.0.0")
+  def replaceDocFuture[T](
       replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
       implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
     ScalaCouchbaseSink.replaceDoc[T](replaceOptions).asJava
 
   /**
    * reference to [[CouchbaseFlow.replace]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #replaceFuture} which works like this method worked in 1.x.
    */
   def replace[T](applyId: T => String,
       replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
-      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] = {
+      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[Done]] =
+    ScalaCouchbaseSink.replace[T](applyId, 
replaceOptions).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.replace]]
+   * @deprecated Use replace which returns CompletionStage instead
+   */
+  @deprecated("Use replace which returns CompletionStage instead", since = 
"2.0.0")
+  def replaceFuture[T](applyId: T => String,
+      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
     ScalaCouchbaseSink.replace[T](applyId, replaceOptions).asJava
-  }
 
   /**
    * reference to [[CouchbaseFlow.remove]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #removeFuture} which works like this method worked in 1.x.
    */
   def remove[T](applyId: T => String,
+      removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[Done]] =
+    ScalaCouchbaseSink.remove[T](applyId, 
removeOptions).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.remove]]
+   * @deprecated Use remove which returns CompletionStage instead
+   */
+  @deprecated("Use remove which returns CompletionStage instead", since = 
"2.0.0")
+  def removeFuture[T](applyId: T => String,
       removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
       implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
     ScalaCouchbaseSink.remove[T](applyId, removeOptions).asJava
 
   /**
    * reference to [[CouchbaseFlow.exists]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #existsFuture} which works like this method worked in 1.x.
    */
   def exists[T](applyId: T => String, existsOptions: ExistsOptions = 
ExistsOptions.existsOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[java.lang.Boolean]] =
+    ScalaCouchbaseSink.exists[T](applyId, existsOptions)
+      
.mapMaterializedValue(_.map(Boolean.box)(scala.concurrent.ExecutionContext.parasitic).asJava)
+      .asJava
+
+  /**
+   * reference to [[CouchbaseFlow.exists]]
+   * @deprecated Use exists which returns CompletionStage instead
+   */
+  @deprecated("Use exists which returns CompletionStage instead", since = 
"2.0.0")
+  def existsFuture[T](applyId: T => String, existsOptions: ExistsOptions = 
ExistsOptions.existsOptions())(
       implicit asyncCollection: AsyncCollection): Sink[T, Future[Boolean]] =
     ScalaCouchbaseSink.exists[T](applyId, existsOptions).asJava
 
diff --git 
a/google-cloud-pub-sub/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
 
b/google-cloud-pub-sub/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
new file mode 100644
index 000000000..8b8b0feaa
--- /dev/null
+++ 
b/google-cloud-pub-sub/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changed methods to return CompletionStage instead of Scala Future
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.googlecloud.pubsub.javadsl.GooglePubSub.subscribeFlow")
diff --git 
a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/javadsl/GooglePubSub.scala
 
b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/javadsl/GooglePubSub.scala
index 4d8e179ab..c9eeab7c4 100644
--- 
a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/javadsl/GooglePubSub.scala
+++ 
b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/javadsl/GooglePubSub.scala
@@ -95,8 +95,25 @@ object GooglePubSub {
 
   /**
    * Creates a flow pulling messages from a subscription.
+   * The materialized CompletionStage completes when the flow is materialized.
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Flow with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #subscribeFlowFuture} which works like this method worked in 
1.x.
    */
-  def subscribeFlow(subscription: String, config: PubSubConfig): Flow[Done, 
ReceivedMessage, Future[NotUsed]] =
+  def subscribeFlow(subscription: String, config: PubSubConfig): Flow[Done, 
ReceivedMessage, CompletionStage[NotUsed]] =
+    GPubSub
+      .subscribeFlow(subscription, config)
+      .mapMaterializedValue(_.asJava)
+      .asJava
+
+  /**
+   * Creates a flow pulling messages from a subscription.
+   * @deprecated Use subscribeFlow which returns CompletionStage instead
+   */
+  @deprecated("Use subscribeFlow which returns CompletionStage instead", since 
= "2.0.0")
+  def subscribeFlowFuture(subscription: String, config: PubSubConfig): 
Flow[Done, ReceivedMessage, Future[NotUsed]] =
     GPubSub
       .subscribeFlow(subscription, config)
       .asJava
diff --git 
a/kinesis/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
 
b/kinesis/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
new file mode 100644
index 000000000..fe91f3c8a
--- /dev/null
+++ 
b/kinesis/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changed methods to return CompletionStage instead of Scala Future
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.kinesis.javadsl.KinesisSchedulerSource.createSharded")
diff --git 
a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/javadsl/KinesisSchedulerSource.scala
 
b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/javadsl/KinesisSchedulerSource.scala
index 7b33c0ec7..da454c6b5 100644
--- 
a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/javadsl/KinesisSchedulerSource.scala
+++ 
b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/javadsl/KinesisSchedulerSource.scala
@@ -40,7 +40,25 @@ object KinesisSchedulerSource {
       .mapMaterializedValue(_.asJava)
       .asJava
 
+  /**
+   * This functions return type changed in 2.0.0 to return a SubSource with a 
CompletionStage instead of a Scala
+   * Future, to be more consistent with Java usage.
+   * @see {@link #createShardedFuture} which works like this method worked in 
1.x.
+   */
   def createSharded(
+      schedulerBuilder: SchedulerBuilder,
+      settings: KinesisSchedulerSourceSettings): SubSource[CommittableRecord, 
CompletionStage[Scheduler]] =
+    new SubSource(
+      scaladsl.KinesisSchedulerSource
+        .apply(schedulerBuilder.build, settings)
+        .mapMaterializedValue(_.asJava)
+        .groupBy(500, _.processorData.shardId))
+
+  /**
+   * @deprecated Use createSharded which returns a SubSource with a 
CompletionStage instead
+   */
+  @deprecated("Use createSharded which returns a SubSource with a 
CompletionStage instead", since = "2.0.0")
+  def createShardedFuture(
       schedulerBuilder: SchedulerBuilder,
       settings: KinesisSchedulerSourceSettings): SubSource[CommittableRecord, 
Future[Scheduler]] =
     new SubSource(
diff --git a/kinesis/src/test/java/docs/javadsl/KclSnippets.java 
b/kinesis/src/test/java/docs/javadsl/KclSnippets.java
index 037d746b6..4ba4755ce 100644
--- a/kinesis/src/test/java/docs/javadsl/KclSnippets.java
+++ b/kinesis/src/test/java/docs/javadsl/KclSnippets.java
@@ -13,6 +13,10 @@
 
 package docs.javadsl;
 
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.CompletionStage;
+
 import org.apache.pekko.NotUsed;
 import org.apache.pekko.stream.connectors.kinesis.CommittableRecord;
 import 
org.apache.pekko.stream.connectors.kinesis.KinesisSchedulerCheckpointSettings;
@@ -20,14 +24,11 @@ import 
org.apache.pekko.stream.connectors.kinesis.KinesisSchedulerSourceSettings
 import 
org.apache.pekko.stream.connectors.kinesis.javadsl.KinesisSchedulerSource;
 import org.apache.pekko.stream.javadsl.Flow;
 import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.stream.javadsl.SubSource;
 import software.amazon.kinesis.coordinator.Scheduler;
 import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
 import software.amazon.kinesis.retrieval.KinesisClientRecord;
 
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.concurrent.CompletionStage;
-
 public class KclSnippets {
 
   // #scheduler-settings
@@ -47,6 +48,13 @@ public class KclSnippets {
       KinesisSchedulerSource.create(schedulerBuilder, schedulerSettings);
   // #scheduler-source
 
+  // #sharded-scheduler-source
+  // Use createSharded to get a per-shard sub-source with a Java-friendly 
CompletionStage
+  // materialized value
+  final SubSource<CommittableRecord, CompletionStage<Scheduler>> 
shardedSchedulerSource =
+      KinesisSchedulerSource.createSharded(schedulerBuilder, 
schedulerSettings);
+  // #sharded-scheduler-source
+
   // #checkpoint
   final KinesisSchedulerCheckpointSettings checkpointSettings =
       KinesisSchedulerCheckpointSettings.create(1000, Duration.of(30L, 
ChronoUnit.SECONDS));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to