This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new b3691c941 Use CompletionStage in Java DSL (#1526)
b3691c941 is described below
commit b3691c94160cd019a2b6ed668a855247ae33aae7
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Mar 26 10:08:40 2026 +0100
Use CompletionStage in Java DSL (#1526)
* Initial plan
* Fix javadsl API to expose Java types instead of Scala types
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/fee60a5d-da86-496b-8bd7-dd68bf2795a5
* refactor
* Fix createSharded compile error: convert mat value on Source before
groupBy
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/9c2dbe39-9ebd-4d06-b570-19bd0bfc0ac9
* mima
* mima take 2
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../javadsl.backwards.excludes | 26 +++++
.../couchbase3/javadsl/CouchbaseSink.scala | 126 ++++++++++++++++++++-
.../javadsl.backwards.excludes | 19 ++++
.../googlecloud/pubsub/javadsl/GooglePubSub.scala | 19 +++-
.../javadsl.backwards.excludes | 19 ++++
.../kinesis/javadsl/KinesisSchedulerSource.scala | 18 +++
.../src/test/java/docs/javadsl/KclSnippets.java | 16 ++-
7 files changed, 235 insertions(+), 8 deletions(-)
diff --git
a/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
b/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
new file mode 100644
index 000000000..c34b73646
--- /dev/null
+++
b/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changed methods to return CompletionStage instead of Scala Future
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.exists")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.remove")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replace")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replaceDoc")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsert")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsertDoc")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insert")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insertDoc")
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
index 30d3b695b..9cc24ff9b 100644
---
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
@@ -24,69 +24,189 @@ import org.apache.pekko.Done
import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseSink
=> ScalaCouchbaseSink }
import org.apache.pekko.stream.javadsl.Sink
+import java.util.concurrent.CompletionStage
import scala.concurrent.Future
+import scala.jdk.FutureConverters._
object CouchbaseSink {
/**
* reference to [[CouchbaseFlow.insertDoc]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #insertDocFuture} which works like this method worked in 1.x.
*/
def insertDoc[T](insertOptions: InsertOptions)(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.insertDoc[T](insertOptions).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.insertDoc]]
+ * @deprecated Use insertDoc which returns CompletionStage instead
+ */
+ @deprecated("Use insertDoc which returns CompletionStage instead", since =
"2.0.0")
+ def insertDocFuture[T](insertOptions: InsertOptions)(
implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
ScalaCouchbaseSink.insertDoc[T](insertOptions).asJava
/**
- * reference to [[CouchbaseFlow.upsertDoc]]
+ * reference to [[CouchbaseFlow.insert]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #insertFuture} which works like this method worked in 1.x.
*/
def insert[T](applyId: T => String,
+ insertOptions: InsertOptions)(
+ implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[Done]] =
+ ScalaCouchbaseSink.insert[T](applyId,
insertOptions).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.insert]]
+ * @deprecated Use insert which returns CompletionStage instead
+ */
+ @deprecated("Use insert which returns CompletionStage instead", since =
"2.0.0")
+ def insertFuture[T](applyId: T => String,
insertOptions: InsertOptions)(
implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
ScalaCouchbaseSink.insert[T](applyId, insertOptions).asJava
/**
* reference to [[CouchbaseFlow.upsertDoc]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #upsertDocFuture} which works like this method worked in 1.x.
*/
def upsertDoc[T](upsertOptions: UpsertOptions =
UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.upsertDoc[T](upsertOptions).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsertDoc]]
+ * @deprecated Use upsertDoc which returns CompletionStage instead
+ */
+ @deprecated("Use upsertDoc which returns CompletionStage instead", since =
"2.0.0")
+ def upsertDocFuture[T](upsertOptions: UpsertOptions =
UpsertOptions.upsertOptions())(
implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
ScalaCouchbaseSink.upsertDoc[T](upsertOptions).asJava
/**
* reference to [[CouchbaseFlow.upsert]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #upsertFuture} which works like this method worked in 1.x.
*/
def upsert[T](applyId: T => String,
+ upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[Done]] =
+ ScalaCouchbaseSink.upsert[T](applyId,
upsertOptions).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsert]]
+ * @deprecated Use upsert which returns CompletionStage instead
+ */
+ @deprecated("Use upsert which returns CompletionStage instead", since =
"2.0.0")
+ def upsertFuture[T](applyId: T => String,
upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
ScalaCouchbaseSink.upsert[T](applyId, upsertOptions).asJava
/**
* reference to [[CouchbaseFlow.replaceDoc]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #replaceDocFuture} which works like this method worked in 1.x.
*/
def replaceDoc[T](
+ replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.replaceDoc[T](replaceOptions).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.replaceDoc]]
+ * @deprecated Use replaceDoc which returns CompletionStage instead
+ */
+ @deprecated("Use replaceDoc which returns CompletionStage instead", since =
"2.0.0")
+ def replaceDocFuture[T](
replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
ScalaCouchbaseSink.replaceDoc[T](replaceOptions).asJava
/**
* reference to [[CouchbaseFlow.replace]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #replaceFuture} which works like this method worked in 1.x.
*/
def replace[T](applyId: T => String,
replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
- implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] = {
+ implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[Done]] =
+ ScalaCouchbaseSink.replace[T](applyId,
replaceOptions).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.replace]]
+ * @deprecated Use replace which returns CompletionStage instead
+ */
+ @deprecated("Use replace which returns CompletionStage instead", since =
"2.0.0")
+ def replaceFuture[T](applyId: T => String,
+ replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
ScalaCouchbaseSink.replace[T](applyId, replaceOptions).asJava
- }
/**
* reference to [[CouchbaseFlow.remove]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #removeFuture} which works like this method worked in 1.x.
*/
def remove[T](applyId: T => String,
+ removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[Done]] =
+ ScalaCouchbaseSink.remove[T](applyId,
removeOptions).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.remove]]
+ * @deprecated Use remove which returns CompletionStage instead
+ */
+ @deprecated("Use remove which returns CompletionStage instead", since =
"2.0.0")
+ def removeFuture[T](applyId: T => String,
removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
ScalaCouchbaseSink.remove[T](applyId, removeOptions).asJava
/**
* reference to [[CouchbaseFlow.exists]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #existsFuture} which works like this method worked in 1.x.
*/
def exists[T](applyId: T => String, existsOptions: ExistsOptions =
ExistsOptions.existsOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[java.lang.Boolean]] =
+ ScalaCouchbaseSink.exists[T](applyId, existsOptions)
+
.mapMaterializedValue(_.map(Boolean.box)(scala.concurrent.ExecutionContext.parasitic).asJava)
+ .asJava
+
+ /**
+ * reference to [[CouchbaseFlow.exists]]
+ * @deprecated Use exists which returns CompletionStage instead
+ */
+ @deprecated("Use exists which returns CompletionStage instead", since =
"2.0.0")
+ def existsFuture[T](applyId: T => String, existsOptions: ExistsOptions =
ExistsOptions.existsOptions())(
implicit asyncCollection: AsyncCollection): Sink[T, Future[Boolean]] =
ScalaCouchbaseSink.exists[T](applyId, existsOptions).asJava
diff --git
a/google-cloud-pub-sub/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
b/google-cloud-pub-sub/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
new file mode 100644
index 000000000..8b8b0feaa
--- /dev/null
+++
b/google-cloud-pub-sub/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changed methods to return CompletionStage instead of Scala Future
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.googlecloud.pubsub.javadsl.GooglePubSub.subscribeFlow")
diff --git
a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/javadsl/GooglePubSub.scala
b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/javadsl/GooglePubSub.scala
index 4d8e179ab..c9eeab7c4 100644
---
a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/javadsl/GooglePubSub.scala
+++
b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/javadsl/GooglePubSub.scala
@@ -95,8 +95,25 @@ object GooglePubSub {
/**
* Creates a flow pulling messages from a subscription.
+ * The materialized CompletionStage completes when the flow is materialized.
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Flow with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #subscribeFlowFuture} which works like this method worked in
1.x.
*/
- def subscribeFlow(subscription: String, config: PubSubConfig): Flow[Done,
ReceivedMessage, Future[NotUsed]] =
+ def subscribeFlow(subscription: String, config: PubSubConfig): Flow[Done,
ReceivedMessage, CompletionStage[NotUsed]] =
+ GPubSub
+ .subscribeFlow(subscription, config)
+ .mapMaterializedValue(_.asJava)
+ .asJava
+
+ /**
+ * Creates a flow pulling messages from a subscription.
+ * @deprecated Use subscribeFlow which returns CompletionStage instead
+ */
+ @deprecated("Use subscribeFlow which returns CompletionStage instead", since
= "2.0.0")
+ def subscribeFlowFuture(subscription: String, config: PubSubConfig):
Flow[Done, ReceivedMessage, Future[NotUsed]] =
GPubSub
.subscribeFlow(subscription, config)
.asJava
diff --git
a/kinesis/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
b/kinesis/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
new file mode 100644
index 000000000..fe91f3c8a
--- /dev/null
+++
b/kinesis/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changed methods to return CompletionStage instead of Scala Future
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.kinesis.javadsl.KinesisSchedulerSource.createSharded")
diff --git
a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/javadsl/KinesisSchedulerSource.scala
b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/javadsl/KinesisSchedulerSource.scala
index 7b33c0ec7..da454c6b5 100644
---
a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/javadsl/KinesisSchedulerSource.scala
+++
b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/javadsl/KinesisSchedulerSource.scala
@@ -40,7 +40,25 @@ object KinesisSchedulerSource {
.mapMaterializedValue(_.asJava)
.asJava
+ /**
+ * This functions return type changed in 2.0.0 to return a SubSource with a
CompletionStage instead of a Scala
+ * Future, to be more consistent with Java usage.
+ * @see {@link #createShardedFuture} which works like this method worked in
1.x.
+ */
def createSharded(
+ schedulerBuilder: SchedulerBuilder,
+ settings: KinesisSchedulerSourceSettings): SubSource[CommittableRecord,
CompletionStage[Scheduler]] =
+ new SubSource(
+ scaladsl.KinesisSchedulerSource
+ .apply(schedulerBuilder.build, settings)
+ .mapMaterializedValue(_.asJava)
+ .groupBy(500, _.processorData.shardId))
+
+ /**
+ * @deprecated Use createSharded which returns a SubSource with a
CompletionStage instead
+ */
+ @deprecated("Use createSharded which returns a SubSource with a
CompletionStage instead", since = "2.0.0")
+ def createShardedFuture(
schedulerBuilder: SchedulerBuilder,
settings: KinesisSchedulerSourceSettings): SubSource[CommittableRecord,
Future[Scheduler]] =
new SubSource(
diff --git a/kinesis/src/test/java/docs/javadsl/KclSnippets.java
b/kinesis/src/test/java/docs/javadsl/KclSnippets.java
index 037d746b6..4ba4755ce 100644
--- a/kinesis/src/test/java/docs/javadsl/KclSnippets.java
+++ b/kinesis/src/test/java/docs/javadsl/KclSnippets.java
@@ -13,6 +13,10 @@
package docs.javadsl;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.CompletionStage;
+
import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.connectors.kinesis.CommittableRecord;
import
org.apache.pekko.stream.connectors.kinesis.KinesisSchedulerCheckpointSettings;
@@ -20,14 +24,11 @@ import
org.apache.pekko.stream.connectors.kinesis.KinesisSchedulerSourceSettings
import
org.apache.pekko.stream.connectors.kinesis.javadsl.KinesisSchedulerSource;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.stream.javadsl.SubSource;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.concurrent.CompletionStage;
-
public class KclSnippets {
// #scheduler-settings
@@ -47,6 +48,13 @@ public class KclSnippets {
KinesisSchedulerSource.create(schedulerBuilder, schedulerSettings);
// #scheduler-source
+ // #sharded-scheduler-source
+ // Use createSharded to get a per-shard sub-source with a Java-friendly
CompletionStage
+ // materialized value
+ final SubSource<CommittableRecord, CompletionStage<Scheduler>>
shardedSchedulerSource =
+ KinesisSchedulerSource.createSharded(schedulerBuilder,
schedulerSettings);
+ // #sharded-scheduler-source
+
// #checkpoint
final KinesisSchedulerCheckpointSettings checkpointSettings =
KinesisSchedulerCheckpointSettings.create(1000, Duration.of(30L,
ChronoUnit.SECONDS));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]