This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 7003333a4 javadsl couchbase3: replace Scala function params with Java 
functions (#1528)
7003333a4 is described below

commit 7003333a4e2498bac1788947a393d266ed45cb27
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 27 10:20:35 2026 +0100

    javadsl couchbase3: replace Scala function params with Java functions 
(#1528)
    
    * Initial plan
    
    * Fix javadsl CouchbaseFlow and CouchbaseSink to use Java functions instead 
of Scala functions
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/33ed6416-d555-48ae-92e0-939238e581ef
    
    * Update CouchbaseFlow.scala
    
    * compile issues
    
    * try to remove implicits
    
    * Update javadsl.backwards.excludes
    
    * more changes
    
    * more changes
    
    * Update javadsl.backwards.excludes
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../javadsl.backwards.excludes                     |  38 +-
 .../couchbase3/javadsl/CouchbaseFlow.scala         | 400 ++++++++++++++++++---
 .../couchbase3/javadsl/CouchbaseSink.scala         | 162 +++++++--
 3 files changed, 512 insertions(+), 88 deletions(-)

diff --git 
a/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
 
b/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
index c34b73646..9b976e29b 100644
--- 
a/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
+++ 
b/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
@@ -15,12 +15,44 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# changed methods to return CompletionStage instead of Scala Future
+# changed Java DSL significantly because it was used Scala Futures (changed to 
CompletionStage),
+# implicit params in separate param lists (now just one param list with no 
implicits) and defaulted parameters
+# have been replaced with method overloads
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.append$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.exists$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.get$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getAllReplicas$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getJson$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.increment$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.insertDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.prepend$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.replaceDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.upsertDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getAllReplicasJson$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getAllReplicasObject$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getAllReplicasType$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getObject$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getType$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.mutateIn$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.mutateInDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.touch$*")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.exists")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.exists")
 
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.exists")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insert")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insert")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insertDoc")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.remove")
 
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.remove")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replace")
 
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replace")
 
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replaceDoc")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsert")
 
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsert")
 
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsertDoc")
-ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insert")
-ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insertDoc")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replaceDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsertDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.exists$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.remove$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replace$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsert$*")
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
index 49fb20014..f771af8a3 100644
--- 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
@@ -27,6 +27,7 @@ import org.apache.pekko.stream.javadsl.Flow
 import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseFlow 
=> ScalaCouchbaseFlow }
 
 import java.time.{ Duration, Instant }
+import java.util.function.{ Function => JFunction }
 
 object CouchbaseFlow {
 
@@ -34,69 +35,145 @@ object CouchbaseFlow {
    * get a document by id from Couchbase collection
    * @param options reference to Couchbase options doc
    */
-  def get(options: GetOptions = GetOptions.getOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, GetResult, 
NotUsed] =
-    ScalaCouchbaseFlow.get(options).asJava
+  def get(options: GetOptions,
+      asyncCollection: AsyncCollection): Flow[String, GetResult, NotUsed] =
+    ScalaCouchbaseFlow.get(options)(asyncCollection).asJava
+
+  /**
+   * get a document by id from Couchbase collection
+   * @param options reference to Couchbase options doc
+   */
+  def get(asyncCollection: AsyncCollection): Flow[String, GetResult, NotUsed] =
+    ScalaCouchbaseFlow.get(GetOptions.getOptions)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.get]] deserialize to Couchbase JsonObject
+   */
+  def getJson(options: GetOptions,
+      asyncCollection: AsyncCollection): Flow[String, JsonObject, NotUsed] =
+    ScalaCouchbaseFlow.getJson(options)(asyncCollection).asJava
 
   /**
    * reference to [[CouchbaseFlow.get]] deserialize to Couchbase JsonObject
    */
-  def getJson(options: GetOptions = GetOptions.getOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, JsonObject, 
NotUsed] =
-    ScalaCouchbaseFlow.getJson(options).asJava
+  def getJson(asyncCollection: AsyncCollection): Flow[String, JsonObject, 
NotUsed] =
+    ScalaCouchbaseFlow.getJson(GetOptions.getOptions)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.get]],deserialize to class
+   * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
+   */
+  def getObject[T](target: Class[T], options: GetOptions,
+      asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getObject[T](target, options)(asyncCollection).asJava
 
   /**
    * reference to [[CouchbaseFlow.get]],deserialize to class
    * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
    */
-  def getObject[T](target: Class[T], options: GetOptions = 
GetOptions.getOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
-    ScalaCouchbaseFlow.getObject[T](target, options).asJava
+  def getObject[T](target: Class[T], asyncCollection: AsyncCollection): 
Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getObject[T](target, 
GetOptions.getOptions)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseSource.getObject]],deserialize to class with 
Generics
+   */
+  def getType[T](target: TypeRef[T], options: GetOptions,
+      asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getType[T](target, options)(asyncCollection).asJava
 
   /**
    * reference to [[CouchbaseSource.getObject]],deserialize to class with 
Generics
    */
-  def getType[T](target: TypeRef[T], options: GetOptions = 
GetOptions.getOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
-    ScalaCouchbaseFlow.getType[T](target, options).asJava
+  def getType[T](target: TypeRef[T], asyncCollection: AsyncCollection): 
Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getType[T](target, 
GetOptions.getOptions)(asyncCollection).asJava
 
   /**
    * similar to [[CouchbaseFlow.get]], but reads from all replicas on the 
active node
    * @see [[CouchbaseFlow#get]]
    */
-  def getAllReplicas(options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, 
GetReplicaResult, NotUsed] =
-    ScalaCouchbaseFlow.getAllReplicas(options).asJava
+  def getAllReplicas(options: GetAllReplicasOptions,
+      asyncCollection: AsyncCollection): Flow[String, GetReplicaResult, 
NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicas(options)(asyncCollection).asJava
+
+  /**
+   * similar to [[CouchbaseFlow.get]], but reads from all replicas on the 
active node
+   * @see [[CouchbaseFlow#get]]
+   */
+  def getAllReplicas(asyncCollection: AsyncCollection): Flow[String, 
GetReplicaResult, NotUsed] =
+    
ScalaCouchbaseFlow.getAllReplicas(GetAllReplicasOptions.getAllReplicasOptions)(asyncCollection).asJava
 
   /**
    * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to Couchbase 
JsonObject
    */
-  def getAllReplicasJson(options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, JsonObject, 
NotUsed] =
-    ScalaCouchbaseFlow.getAllReplicasJson(options).asJava
+  def getAllReplicasJson(options: GetAllReplicasOptions,
+      asyncCollection: AsyncCollection): Flow[String, JsonObject, NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicasJson(options)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to Couchbase 
JsonObject
+   */
+  def getAllReplicasJson(asyncCollection: AsyncCollection): Flow[String, 
JsonObject, NotUsed] =
+    
ScalaCouchbaseFlow.getAllReplicasJson(GetAllReplicasOptions.getAllReplicasOptions)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to class
+   * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
+   */
+  def getAllReplicasObject[T](target: Class[T],
+      getOptions: GetAllReplicasOptions,
+      asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicasObject[T](target, 
getOptions)(asyncCollection).asJava
 
   /**
    * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to class
    * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
    */
   def getAllReplicasObject[T](target: Class[T],
-      getOptions: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
-    ScalaCouchbaseFlow.getAllReplicasObject[T](target, getOptions).asJava
+      asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicasObject[T](target, 
GetAllReplicasOptions.getAllReplicasOptions)(
+      asyncCollection).asJava
 
   /**
    * reference to [[CouchbaseFlow.getAllReplicasObject]], deserialize to class 
with Generics
    */
   def getAllReplicasType[T](target: TypeRef[T],
-      getOptions: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
-    ScalaCouchbaseFlow.getAllReplicasType(target, getOptions).asJava
+      getOptions: GetAllReplicasOptions,
+      asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicasType(target, 
getOptions)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicasObject]], deserialize to class 
with Generics
+   */
+  def getAllReplicasType[T](target: TypeRef[T],
+      asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicasType(target, 
GetAllReplicasOptions.getAllReplicasOptions)(asyncCollection).asJava
 
   /**
    * Inserts a full document which does not exist yet with custom options.
    * @param applyId parse id function, which is the document id
    * @see [[com.couchbase.client.java.AsyncCollection#insert]]
    */
+  def insert[T](applyId: JFunction[T, String],
+      insertOptions: InsertOptions,
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.insert[T](applyId.apply, 
insertOptions)(asyncCollection).asJava
+
+  /**
+   * Inserts a full document which does not exist yet with custom options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#insert]]
+   */
+  def insert[T](applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.insert[T](applyId.apply)(asyncCollection).asJava
+
+  /**
+   * Inserts a full document which does not exist yet with custom options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#insert]]
+   * @deprecated Use the overloaded method that takes a 
java.util.function.Function instead (since 2.0.0)
+   */
+  @deprecated("Use the overloaded method that takes a 
java.util.function.Function instead", since = "2.0.0")
   def insert[T](applyId: T => String,
       insertOptions: InsertOptions = InsertOptions.insertOptions())(
       implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
@@ -106,15 +183,43 @@ object CouchbaseFlow {
    * reference to [[CouchbaseFlow.insert]] <br>
    * use MutationDocument to wrapper id, document and result(MutationResult)
    */
-  def insertDoc[T](insertOptions: InsertOptions = 
InsertOptions.insertOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
-    ScalaCouchbaseFlow.insertDoc[T](insertOptions).asJava
+  def insertDoc[T](insertOptions: InsertOptions,
+      asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.insertDoc[T](insertOptions)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.insert]] <br>
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def insertDoc[T](asyncCollection: AsyncCollection): 
Flow[MutationDocument[T], MutationDocument[T], NotUsed] =
+    
ScalaCouchbaseFlow.insertDoc[T](InsertOptions.insertOptions())(asyncCollection).asJava
 
   /**
    * Replaces a full document which already exists with custom options.
    * @param applyId parse id function, which is the document id
    * @see [[com.couchbase.client.java.AsyncCollection#replace]]
    */
+  def replace[T](applyId: JFunction[T, String],
+      replaceOptions: ReplaceOptions,
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.replace[T](applyId.apply, 
replaceOptions)(asyncCollection).asJava
+
+  /**
+   * Replaces a full document which already exists with custom options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#replace]]
+   */
+  def replace[T](applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.replace[T](applyId.apply)(asyncCollection).asJava
+
+  /**
+   * Replaces a full document which already exists with custom options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#replace]]
+   * @deprecated Use the overloaded method that takes a 
java.util.function.Function instead (since 2.0.0)
+   */
+  @deprecated("Use the overloaded method that takes a 
java.util.function.Function instead", since = "2.0.0")
   def replace[T](applyId: T => String,
       replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
       implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
@@ -124,15 +229,43 @@ object CouchbaseFlow {
    * reference to [[CouchbaseFlow.replace]]
    * use MutationDocument to wrapper id, document and result(MutationResult)
    */
-  def replaceDoc[T](replaceOptions: ReplaceOptions = 
ReplaceOptions.replaceOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
-    ScalaCouchbaseFlow.replaceDoc[T](replaceOptions).asJava
+  def replaceDoc[T](replaceOptions: ReplaceOptions,
+      asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.replaceDoc[T](replaceOptions)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.replace]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def replaceDoc[T](asyncCollection: AsyncCollection): 
Flow[MutationDocument[T], MutationDocument[T], NotUsed] =
+    
ScalaCouchbaseFlow.replaceDoc[T](ReplaceOptions.replaceOptions())(asyncCollection).asJava
+
+  /**
+   * Upsert a full document which might or might not exist yet with custom 
options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
+   */
+  def upsert[T](applyId: JFunction[T, String],
+      upsertOptions: UpsertOptions,
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.upsert[T](applyId.apply, 
upsertOptions)(asyncCollection).asJava
 
   /**
    * Upsert a full document which might or might not exist yet with custom 
options.
    * @param applyId parse id function, which is the document id
    * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
    */
+  def upsert[T](applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.upsert[T](applyId.apply)(asyncCollection).asJava
+
+  /**
+   * Upsert a full document which might or might not exist yet with custom 
options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
+   * @deprecated Use the overloaded method that takes a 
java.util.function.Function instead (since 2.0.0)
+   */
+  @deprecated("Use the overloaded method that takes a 
java.util.function.Function instead", since = "2.0.0")
   def upsert[T](applyId: T => String,
       upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
       implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
@@ -143,15 +276,46 @@ object CouchbaseFlow {
    * use MutationDocument to wrapper id, document and result(MutationResult)
    */
   def upsertDoc[T](
-      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
-    ScalaCouchbaseFlow.upsertDoc[T](upsertOptions).asJava
+      upsertOptions: UpsertOptions,
+      asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.upsertDoc[T](upsertOptions)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsert]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def upsertDoc[T](
+      asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    
ScalaCouchbaseFlow.upsertDoc[T](UpsertOptions.upsertOptions())(asyncCollection).asJava
+
+  /**
+   * Removes a Document from a collection with custom options.
+   * @param applyId parse id function, which is the document id, id streams 
can use `remove[String](e => e)`
+   * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+   */
+  def remove[T](
+      applyId: JFunction[T, String],
+      removeOptions: RemoveOptions,
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.remove[T](applyId.apply, 
removeOptions)(asyncCollection).asJava
+
+  /**
+   * Removes a Document from a collection with custom options.
+   * @param applyId parse id function, which is the document id, id streams 
can use `remove[String](e => e)`
+   * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+   */
+  def remove[T](
+      applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.remove[T](applyId.apply)(asyncCollection).asJava
 
   /**
    * Removes a Document from a collection with custom options.
    * @param applyId parse id function, which is the document id, id streams 
can use `remove[String](e => e)`
    * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+   * @deprecated Use the overloaded method that takes a 
java.util.function.Function instead (since 2.0.0)
    */
+  @deprecated("Use the overloaded method that takes a 
java.util.function.Function instead", since = "2.0.0")
   def remove[T](
       applyId: T => String,
       removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
@@ -162,9 +326,17 @@ object CouchbaseFlow {
    * Performs mutations to document fragments with custom options.
    * @see [[com.couchbase.client.java.AsyncCollection#mutateIn]]
    */
-  def mutateIn(specs: java.util.List[MutateInSpec], options: MutateInOptions = 
MutateInOptions.mutateInOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[String, MutateInResult, 
NotUsed] =
-    ScalaCouchbaseFlow.mutateIn(specs, options).asJava
+  def mutateIn(specs: java.util.List[MutateInSpec], options: MutateInOptions,
+      asyncCollection: AsyncCollection): Flow[String, MutateInResult, NotUsed] 
=
+    ScalaCouchbaseFlow.mutateIn(specs, options)(asyncCollection).asJava
+
+  /**
+   * Performs mutations to document fragments with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#mutateIn]]
+   */
+  def mutateIn(specs: java.util.List[MutateInSpec],
+      asyncCollection: AsyncCollection): Flow[String, MutateInResult, NotUsed] 
=
+    ScalaCouchbaseFlow.mutateIn(specs, 
MutateInOptions.mutateInOptions())(asyncCollection).asJava
 
   /**
    * reference to [[CouchbaseFlow.mutateIn]]
@@ -173,9 +345,19 @@ object CouchbaseFlow {
    */
   def mutateInDoc[T](
       specs: java.util.List[MutateInSpec],
-      options: MutateInOptions = MutateInOptions.mutateInOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
-    ScalaCouchbaseFlow.mutateInDoc[T](specs, options).asJava
+      options: MutateInOptions,
+      asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.mutateInDoc[T](specs, options)(asyncCollection).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.mutateIn]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   * @return
+   */
+  def mutateInDoc[T](
+      specs: java.util.List[MutateInSpec],
+      asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.mutateInDoc[T](specs, 
MutateInOptions.mutateInOptions())(asyncCollection).asJava
 
   /**
    * Checks if the given document ID exists on the active partition with 
custom options.
@@ -183,23 +365,76 @@ object CouchbaseFlow {
    * @see [[com.couchbase.client.java.AsyncCollection#exists]]
    */
   def exists[T](
-      applyId: T => String,
-      existsOptions: ExistsOptions = ExistsOptions.existsOptions())(
+      applyId: JFunction[T, String],
+      existsOptions: ExistsOptions,
+      asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
+    ScalaCouchbaseFlow.exists[T](applyId.apply, 
existsOptions)(asyncCollection).asJava
+
+  /**
+   * Checks if the given document ID exists on the active partition with 
custom options.
+   * @param applyId parse id function, which is the document id, id streams 
can use `exists[String](e => e)`
+   * @see [[com.couchbase.client.java.AsyncCollection#exists]]
+   */
+  def exists[T](
+      applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
+    ScalaCouchbaseFlow.exists[T](applyId.apply)(asyncCollection).asJava
+
+  /**
+   * Checks if the given document ID exists on the active partition with 
custom options.
+   * @param applyId parse id function, which is the document id, id streams 
can use `exists[String](e => e)`
+   * @see [[com.couchbase.client.java.AsyncCollection#exists]]
+   * @deprecated Use the overloaded method that takes a 
java.util.function.Function instead (since 2.0.0)
+   */
+  @deprecated("Use the overloaded method that takes a 
java.util.function.Function instead", since = "2.0.0")
+  def exists[T](
+      applyId: T => String)(
       implicit asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
-    ScalaCouchbaseFlow.exists[T](applyId, existsOptions).asJava
+    ScalaCouchbaseFlow.exists[T](applyId).asJava
 
   /**
    * Updates the expiry of the document with the given id with custom options.
    * @see [[com.couchbase.client.java.AsyncCollection#touch]]
    */
-  def touch(expiry: Duration, options: TouchOptions = 
TouchOptions.touchOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[String, MutationResult, 
NotUsed] =
-    ScalaCouchbaseFlow.touch(expiry, options).asJava
+  def touch(expiry: Duration, options: TouchOptions,
+      asyncCollection: AsyncCollection): Flow[String, MutationResult, NotUsed] 
=
+    ScalaCouchbaseFlow.touch(expiry, options)(asyncCollection).asJava
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+   */
+  def touch(expiry: Duration,
+      asyncCollection: AsyncCollection): Flow[String, MutationResult, NotUsed] 
=
+    ScalaCouchbaseFlow.touch(expiry, 
TouchOptions.touchOptions())(asyncCollection).asJava
 
   /**
    * Updates the expiry of the document with the given id with custom options.
    * @param applyId parse id function, which is the document id
    */
+  def touchDuration[T](
+      applyId: JFunction[T, String],
+      expiry: Duration,
+      touchOptions: TouchOptions,
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.touchDuration[T](applyId.apply, expiry, 
touchOptions)(asyncCollection).asJava
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @param applyId parse id function, which is the document id
+   */
+  def touchDuration[T](
+      applyId: JFunction[T, String],
+      expiry: Duration,
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.touchDuration[T](applyId.apply, 
expiry)(asyncCollection).asJava
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @param applyId parse id function, which is the document id
+   * @deprecated Use the overloaded method that takes a 
java.util.function.Function instead (since 2.0.0)
+   */
+  @deprecated("Use the overloaded method that takes a 
java.util.function.Function instead", since = "2.0.0")
   def touchDuration[T](
       applyId: T => String,
       expiry: Duration,
@@ -211,6 +446,29 @@ object CouchbaseFlow {
    * Updates the expiry of the document with the given id with custom options.
    * @see [[com.couchbase.client.java.AsyncCollection#touch]]
    */
+  def touchInstant[T](
+      applyId: JFunction[T, String],
+      expiry: Instant,
+      touchOptions: TouchOptions,
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.touchInstant[T](applyId.apply, expiry, 
touchOptions)(asyncCollection).asJava
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+   */
+  def touchInstant[T](
+      applyId: JFunction[T, String],
+      expiry: Instant,
+      asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.touchInstant[T](applyId.apply, 
expiry)(asyncCollection).asJava
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+   * @deprecated Use the overloaded method that takes a 
java.util.function.Function instead (since 2.0.0)
+   */
+  @deprecated("Use the overloaded method that takes a 
java.util.function.Function instead", since = "2.0.0")
   def touchInstant[T](
       applyId: T => String,
       expiry: Instant,
@@ -222,32 +480,60 @@ object CouchbaseFlow {
    * Appends binary content to the document with custom options.
    * @see [[com.couchbase.client.java.AsyncBinaryCollection#append]]
    */
-  def append(options: AppendOptions = AppendOptions.appendOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
-    ScalaCouchbaseFlow.append(options).asJava
+  def append(options: AppendOptions,
+      asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
+    ScalaCouchbaseFlow.append(options)(asyncCollection).asJava
+
+  /**
+   * Appends binary content to the document with custom options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#append]]
+   */
+  def append(asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
+    
ScalaCouchbaseFlow.append(AppendOptions.appendOptions())(asyncCollection).asJava
 
   /**
    * Prepends binary content to the document with custom options.
    * @see [[com.couchbase.client.java.AsyncBinaryCollection#prepend]]
    */
-  def prepend(options: PrependOptions = PrependOptions.prependOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
-    ScalaCouchbaseFlow.prepend(options).asJava
+  def prepend(options: PrependOptions,
+      asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
+    ScalaCouchbaseFlow.prepend(options)(asyncCollection).asJava
+
+  /**
+   * Prepends binary content to the document with custom options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#prepend]]
+   */
+  def prepend(asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
+    
ScalaCouchbaseFlow.prepend(PrependOptions.prependOptions())(asyncCollection).asJava
+
+  /**
+   * Increments the counter document by one or the number defined in the 
options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#increment]]
+   */
+  def increment(options: IncrementOptions,
+      asyncCollection: AsyncCollection): Flow[String, CounterResult, NotUsed] =
+    ScalaCouchbaseFlow.increment(options)(asyncCollection).asJava
 
   /**
    * Increments the counter document by one or the number defined in the 
options.
    * @see [[com.couchbase.client.java.AsyncBinaryCollection#increment]]
    */
-  def increment(options: IncrementOptions = 
IncrementOptions.incrementOptions())(
-      implicit asyncCollection: AsyncCollection): Flow[String, CounterResult, 
NotUsed] =
-    ScalaCouchbaseFlow.increment(options).asJava
+  def increment(asyncCollection: AsyncCollection): Flow[String, CounterResult, 
NotUsed] =
+    
ScalaCouchbaseFlow.increment(IncrementOptions.incrementOptions())(asyncCollection).asJava
+
+  /**
+   * Decrements the counter document by one or the number defined in the 
options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#decrement]]
+   */
+  def decrement(options: DecrementOptions,
+      asyncCollection: AsyncCollection): Flow[String, CounterResult, NotUsed] =
+    ScalaCouchbaseFlow.decrement(options)(asyncCollection).asJava
 
   /**
    * Decrements the counter document by one or the number defined in the 
options.
    * @see [[com.couchbase.client.java.AsyncBinaryCollection#decrement]]
    */
-  def decrement(options: DecrementOptions)(
-      implicit asyncCollection: AsyncCollection): Flow[String, CounterResult, 
NotUsed] =
-    ScalaCouchbaseFlow.decrement(options).asJava
+  def decrement(asyncCollection: AsyncCollection): Flow[String, CounterResult, 
NotUsed] =
+    
ScalaCouchbaseFlow.decrement(DecrementOptions.decrementOptions())(asyncCollection).asJava
 
 }
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
index 9cc24ff9b..9da63df22 100644
--- 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
@@ -25,6 +25,7 @@ import 
org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseSink =>
 import org.apache.pekko.stream.javadsl.Sink
 
 import java.util.concurrent.CompletionStage
+import java.util.function.{ Function => JFunction }
 import scala.concurrent.Future
 import scala.jdk.FutureConverters._
 
@@ -38,9 +39,22 @@ object CouchbaseSink {
    * </p>
    * @see {@link #insertDocFuture} which works like this method worked in 1.x.
    */
-  def insertDoc[T](insertOptions: InsertOptions)(
-      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
-    
ScalaCouchbaseSink.insertDoc[T](insertOptions).mapMaterializedValue(_.asJava).asJava
+  def insertDoc[T](insertOptions: InsertOptions,
+      asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.insertDoc[T](insertOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.insertDoc]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #insertDocFuture} which works like this method worked in 1.x.
+   */
+  def insertDoc[T](
+      asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.insertDoc[T](InsertOptions.insertOptions())(asyncCollection).mapMaterializedValue(
+      _.asJava).asJava
 
   /**
    * reference to [[CouchbaseFlow.insertDoc]]
@@ -59,10 +73,23 @@ object CouchbaseSink {
    * </p>
    * @see {@link #insertFuture} which works like this method worked in 1.x.
    */
-  def insert[T](applyId: T => String,
-      insertOptions: InsertOptions)(
-      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[Done]] =
-    ScalaCouchbaseSink.insert[T](applyId, 
insertOptions).mapMaterializedValue(_.asJava).asJava
+  def insert[T](applyId: JFunction[T, String],
+      insertOptions: InsertOptions,
+      asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+    ScalaCouchbaseSink.insert[T](applyId.apply, 
insertOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.insert]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #insertFuture} which works like this method worked in 1.x.
+   */
+  def insert[T](applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+    ScalaCouchbaseSink.insert[T](applyId.apply, 
InsertOptions.insertOptions())(asyncCollection).mapMaterializedValue(
+      _.asJava).asJava
 
   /**
    * reference to [[CouchbaseFlow.insert]]
@@ -82,9 +109,22 @@ object CouchbaseSink {
    * </p>
    * @see {@link #upsertDocFuture} which works like this method worked in 1.x.
    */
-  def upsertDoc[T](upsertOptions: UpsertOptions = 
UpsertOptions.upsertOptions())(
-      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
-    
ScalaCouchbaseSink.upsertDoc[T](upsertOptions).mapMaterializedValue(_.asJava).asJava
+  def upsertDoc[T](upsertOptions: UpsertOptions,
+      asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.upsertDoc[T](upsertOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsertDoc]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #upsertDocFuture} which works like this method worked in 1.x.
+   */
+  def upsertDoc[T](
+      asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.upsertDoc[T](UpsertOptions.upsertOptions())(asyncCollection).mapMaterializedValue(
+      _.asJava).asJava
 
   /**
    * reference to [[CouchbaseFlow.upsertDoc]]
@@ -103,10 +143,23 @@ object CouchbaseSink {
    * </p>
    * @see {@link #upsertFuture} which works like this method worked in 1.x.
    */
-  def upsert[T](applyId: T => String,
-      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
-      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[Done]] =
-    ScalaCouchbaseSink.upsert[T](applyId, 
upsertOptions).mapMaterializedValue(_.asJava).asJava
+  def upsert[T](applyId: JFunction[T, String],
+      upsertOptions: UpsertOptions,
+      asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+    ScalaCouchbaseSink.upsert[T](applyId.apply, 
upsertOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsert]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #upsertFuture} which works like this method worked in 1.x.
+   */
+  def upsert[T](applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+    ScalaCouchbaseSink.upsert[T](applyId.apply, 
UpsertOptions.upsertOptions())(asyncCollection).mapMaterializedValue(
+      _.asJava).asJava
 
   /**
    * reference to [[CouchbaseFlow.upsert]]
@@ -127,9 +180,22 @@ object CouchbaseSink {
    * @see {@link #replaceDocFuture} which works like this method worked in 1.x.
    */
   def replaceDoc[T](
-      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
-      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
-    
ScalaCouchbaseSink.replaceDoc[T](replaceOptions).mapMaterializedValue(_.asJava).asJava
+      replaceOptions: ReplaceOptions,
+      asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.replaceDoc[T](replaceOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.replaceDoc]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #replaceDocFuture} which works like this method worked in 1.x.
+   */
+  def replaceDoc[T](
+      asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
CompletionStage[Done]] =
+    
ScalaCouchbaseSink.replaceDoc[T](ReplaceOptions.replaceOptions())(asyncCollection).mapMaterializedValue(
+      _.asJava).asJava
 
   /**
    * reference to [[CouchbaseFlow.replaceDoc]]
@@ -149,10 +215,23 @@ object CouchbaseSink {
    * </p>
    * @see {@link #replaceFuture} which works like this method worked in 1.x.
    */
-  def replace[T](applyId: T => String,
-      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
-      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[Done]] =
-    ScalaCouchbaseSink.replace[T](applyId, 
replaceOptions).mapMaterializedValue(_.asJava).asJava
+  def replace[T](applyId: JFunction[T, String],
+      replaceOptions: ReplaceOptions,
+      asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+    ScalaCouchbaseSink.replace[T](applyId.apply, 
replaceOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.replace]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #replaceFuture} which works like this method worked in 1.x.
+   */
+  def replace[T](applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+    ScalaCouchbaseSink.replace[T](applyId.apply, 
ReplaceOptions.replaceOptions())(asyncCollection).mapMaterializedValue(
+      _.asJava).asJava
 
   /**
    * reference to [[CouchbaseFlow.replace]]
@@ -172,10 +251,23 @@ object CouchbaseSink {
    * </p>
    * @see {@link #removeFuture} which works like this method worked in 1.x.
    */
-  def remove[T](applyId: T => String,
-      removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
-      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[Done]] =
-    ScalaCouchbaseSink.remove[T](applyId, 
removeOptions).mapMaterializedValue(_.asJava).asJava
+  def remove[T](applyId: JFunction[T, String],
+      removeOptions: RemoveOptions,
+      asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+    ScalaCouchbaseSink.remove[T](applyId.apply, 
removeOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.remove]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #removeFuture} which works like this method worked in 1.x.
+   */
+  def remove[T](applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+    ScalaCouchbaseSink.remove[T](applyId.apply, 
RemoveOptions.removeOptions())(asyncCollection).mapMaterializedValue(
+      _.asJava).asJava
 
   /**
    * reference to [[CouchbaseFlow.remove]]
@@ -195,9 +287,23 @@ object CouchbaseSink {
    * </p>
    * @see {@link #existsFuture} which works like this method worked in 1.x.
    */
-  def exists[T](applyId: T => String, existsOptions: ExistsOptions = 
ExistsOptions.existsOptions())(
-      implicit asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[java.lang.Boolean]] =
-    ScalaCouchbaseSink.exists[T](applyId, existsOptions)
+  def exists[T](applyId: JFunction[T, String], existsOptions: ExistsOptions,
+      asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[java.lang.Boolean]] =
+    ScalaCouchbaseSink.exists[T](applyId.apply, existsOptions)(asyncCollection)
+      
.mapMaterializedValue(_.map(Boolean.box)(scala.concurrent.ExecutionContext.parasitic).asJava)
+      .asJava
+
+  /**
+   * reference to [[CouchbaseFlow.exists]]
+   * <p>
+   *   This function's return type changed in 2.0.0 to return a Sink with a 
CompletionStage instead of a
+   *   Scala Future, to be more consistent with Java usage.
+   * </p>
+   * @see {@link #existsFuture} which works like this method worked in 1.x.
+   */
+  def exists[T](applyId: JFunction[T, String],
+      asyncCollection: AsyncCollection): Sink[T, 
CompletionStage[java.lang.Boolean]] =
+    ScalaCouchbaseSink.exists[T](applyId.apply, 
ExistsOptions.existsOptions())(asyncCollection)
       
.mapMaterializedValue(_.map(Boolean.box)(scala.concurrent.ExecutionContext.parasitic).asJava)
       .asJava
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to