This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ce1866d0 Support Couchbase3 (#624)
2ce1866d0 is described below

commit 2ce1866d0897485bacd2b5258e0767cb9925dfba
Author: Laglangyue <[email protected]>
AuthorDate: Tue May 7 07:50:02 2024 +0800

    Support Couchbase3 (#624)
    
    * support couchbase3
    
    * use implicit for asyncCollection, add spec
    
    * use implicit for asyncCollection, add spec
    
    * test couchbase3 in check-build-test.yml
    
    * add more test,remove not need conf
    
    * add java doc
    
    * add java dsl
    
    * revert not need change
    
    * add test for increment and decrement
    
    * fix fmt
    
    * fix failed with scala 2.12
    
    * fix compile failed with scala 3.3
    
    * fix ci test
    
    * ignore mima check for couchbase3
    
    * fix couchbase2 docs
    
    * Update application.conf
    
    * Update CouchbaseSource.scala
    
    * Update CouchbaseSource.scala
    
    * update test and some comment
    
    * update analyticsQuery
    
    * update java/scala doc
    
    ---------
    
    Co-authored-by: PJ Fanning <[email protected]>
---
 .github/workflows/check-build-test.yml             |   1 +
 build.sbt                                          |   6 +-
 .../stream/connectors/couchbase3/Document.scala    |  34 +++
 .../couchbase3/javadsl/CouchbaseFlow.scala         | 253 +++++++++++++++++
 .../couchbase3/javadsl/CouchbaseSink.scala         |  93 ++++++
 .../couchbase3/javadsl/CouchbaseSource.scala       | 194 +++++++++++++
 .../couchbase3/scaladsl/CouchbaseFlow.scala        | 314 +++++++++++++++++++++
 .../couchbase3/scaladsl/CouchbaseSink.scala        |  91 ++++++
 .../couchbase3/scaladsl/CouchbaseSource.scala      | 226 +++++++++++++++
 couchbase3/src/test/resources/application.conf     |   7 +
 couchbase3/src/test/resources/logback-test.xml     |  31 ++
 .../docs/scaladsl/CouchbaseTestFlowSpec.scala      | 165 +++++++++++
 .../docs/scaladsl/CouchbaseTestSourceSpec.scala    | 102 +++++++
 .../couchbase3/CouchbaseTestSupport.scala          | 102 +++++++
 docker-compose.yml                                 |  87 +++---
 docs/src/main/paradox/couchbase.md                 |   2 +-
 project/Dependencies.scala                         |   9 +
 17 files changed, 1675 insertions(+), 42 deletions(-)

diff --git a/.github/workflows/check-build-test.yml 
b/.github/workflows/check-build-test.yml
index d22b46de4..96d9b7bb5 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -83,6 +83,7 @@ jobs:
           - { connector: azure-storage-queue }
           - { connector: cassandra,                    pre_cmd: 
'docker-compose up -d cassandra' }
           - { connector: couchbase,                    pre_cmd: 
'docker-compose up -d couchbase_prep' }
+          - { connector: couchbase3,                   pre_cmd: 
'docker-compose up -d couchbase_prep' }
           - { connector: csv }
           - { connector: dynamodb,                     pre_cmd: 
'docker-compose up -d dynamodb' }
           - { connector: elasticsearch,                pre_cmd: 
'docker-compose up -d elasticsearch6 elasticsearch7 opensearch1' }
diff --git a/build.sbt b/build.sbt
index da9e78b72..476eb9c1c 100644
--- a/build.sbt
+++ b/build.sbt
@@ -22,6 +22,7 @@ lazy val userProjects: Seq[ProjectReference] = 
List[ProjectReference](
   azureStorageQueue,
   cassandra,
   couchbase,
+  couchbase3,
   csv,
   dynamodb,
   elasticsearch,
@@ -140,6 +141,9 @@ lazy val cassandra =
 lazy val couchbase =
   pekkoConnectorProject("couchbase", "couchbase", Dependencies.Couchbase)
 
+lazy val couchbase3 =
+  pekkoConnectorProject("couchbase3", "couchbase3", Dependencies.Couchbase3)
+
 lazy val csv = pekkoConnectorProject("csv", "csv")
 
 lazy val csvBench = internalProject("csv-bench")
@@ -460,7 +464,7 @@ def pekkoConnectorProject(projectId: String,
       licenses := List(License.Apache2),
       AutomaticModuleName.settings(s"pekko.stream.connectors.$moduleName"),
       mimaPreviousArtifacts := {
-        if (moduleName == "slick") {
+        if (moduleName == "slick" || moduleName == "couchbase3") {
           Set.empty
         } else {
           Set(organization.value %% name.value % mimaCompareVersion)
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/Document.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/Document.scala
new file mode 100644
index 000000000..005be1030
--- /dev/null
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/Document.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3
+
+import com.couchbase.client.java.kv.MutationResult
+
+case class MutationDocument[T](id: String, doc: T, result: 
Option[MutationResult] = Option.empty) {
+
+  def withResult(mutationResult: MutationResult) = {
+    copy(result = Some(mutationResult))
+  }
+}
+
+case class MutationBinaryDocument(id: String, doc: Array[Byte], result: 
Option[MutationResult] = Option.empty) {
+
+  def withResult(mutationResult: MutationResult) = {
+    copy(result = Some(mutationResult))
+  }
+}
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
new file mode 100644
index 000000000..49fb20014
--- /dev/null
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.javadsl
+
+import com.couchbase.client.java.AsyncCollection
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.json.JsonObject
+import com.couchbase.client.java.kv._
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.connectors.couchbase3.{ MutationBinaryDocument, 
MutationDocument }
+import org.apache.pekko.stream.javadsl.Flow
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseFlow 
=> ScalaCouchbaseFlow }
+
+import java.time.{ Duration, Instant }
+
+object CouchbaseFlow {
+
+  /**
+   * get a document by id from Couchbase collection
+   * @param options reference to Couchbase options doc
+   */
+  def get(options: GetOptions = GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, GetResult, 
NotUsed] =
+    ScalaCouchbaseFlow.get(options).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.get]] deserialize to Couchbase JsonObject
+   */
+  def getJson(options: GetOptions = GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, JsonObject, 
NotUsed] =
+    ScalaCouchbaseFlow.getJson(options).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.get]],deserialize to class
+   * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
+   */
+  def getObject[T](target: Class[T], options: GetOptions = 
GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getObject[T](target, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.getObject]],deserialize to class with 
Generics
+   */
+  def getType[T](target: TypeRef[T], options: GetOptions = 
GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getType[T](target, options).asJava
+
+  /**
+   * similar to [[CouchbaseFlow.get]], but reads from all replicas on the 
active node
+   * @see [[CouchbaseFlow#get]]
+   */
+  def getAllReplicas(options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, 
GetReplicaResult, NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicas(options).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to Couchbase 
JsonObject
+   */
+  def getAllReplicasJson(options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, JsonObject, 
NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicasJson(options).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to class
+   * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
+   */
+  def getAllReplicasObject[T](target: Class[T],
+      getOptions: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicasObject[T](target, getOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicasObject]], deserialize to class 
with Generics
+   */
+  def getAllReplicasType[T](target: TypeRef[T],
+      getOptions: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    ScalaCouchbaseFlow.getAllReplicasType(target, getOptions).asJava
+
+  /**
+   * Inserts a full document which does not exist yet with custom options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#insert]]
+   */
+  def insert[T](applyId: T => String,
+      insertOptions: InsertOptions = InsertOptions.insertOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.insert[T](applyId, insertOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.insert]] <br>
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def insertDoc[T](insertOptions: InsertOptions = 
InsertOptions.insertOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.insertDoc[T](insertOptions).asJava
+
+  /**
+   * Replaces a full document which already exists with custom options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#replace]]
+   */
+  def replace[T](applyId: T => String,
+      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.replace[T](applyId, replaceOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.replace]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def replaceDoc[T](replaceOptions: ReplaceOptions = 
ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.replaceDoc[T](replaceOptions).asJava
+
+  /**
+   * Upsert a full document which might or might not exist yet with custom 
options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
+   */
+  def upsert[T](applyId: T => String,
+      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.upsert[T](applyId, upsertOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsert]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def upsertDoc[T](
+      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.upsertDoc[T](upsertOptions).asJava
+
+  /**
+   * Removes a Document from a collection with custom options.
+   * @param applyId parse id function, which is the document id, id streams 
can use `remove[String](e => e)`
+   * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+   */
+  def remove[T](
+      applyId: T => String,
+      removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.remove[T](applyId, removeOptions).asJava
+
+  /**
+   * Performs mutations to document fragments with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#mutateIn]]
+   */
+  def mutateIn(specs: java.util.List[MutateInSpec], options: MutateInOptions = 
MutateInOptions.mutateInOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[String, MutateInResult, 
NotUsed] =
+    ScalaCouchbaseFlow.mutateIn(specs, options).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.mutateIn]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   * @return
+   */
+  def mutateInDoc[T](
+      specs: java.util.List[MutateInSpec],
+      options: MutateInOptions = MutateInOptions.mutateInOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    ScalaCouchbaseFlow.mutateInDoc[T](specs, options).asJava
+
+  /**
+   * Checks if the given document ID exists on the active partition with 
custom options.
+   * @param applyId parse id function, which is the document id, id streams 
can use `exists[String](e => e)`
+   * @see [[com.couchbase.client.java.AsyncCollection#exists]]
+   */
+  def exists[T](
+      applyId: T => String,
+      existsOptions: ExistsOptions = ExistsOptions.existsOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
+    ScalaCouchbaseFlow.exists[T](applyId, existsOptions).asJava
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+   */
+  def touch(expiry: Duration, options: TouchOptions = 
TouchOptions.touchOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[String, MutationResult, 
NotUsed] =
+    ScalaCouchbaseFlow.touch(expiry, options).asJava
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @param applyId parse id function, which is the document id
+   */
+  def touchDuration[T](
+      applyId: T => String,
+      expiry: Duration,
+      touchOptions: TouchOptions = TouchOptions.touchOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.touchDuration[T](applyId, expiry, touchOptions).asJava
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+   */
+  def touchInstant[T](
+      applyId: T => String,
+      expiry: Instant,
+      touchOptions: TouchOptions = TouchOptions.touchOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    ScalaCouchbaseFlow.touchInstant[T](applyId, expiry, touchOptions).asJava
+
+  /**
+   * Appends binary content to the document with custom options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#append]]
+   */
+  def append(options: AppendOptions = AppendOptions.appendOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
+    ScalaCouchbaseFlow.append(options).asJava
+
+  /**
+   * Prepends binary content to the document with custom options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#prepend]]
+   */
+  def prepend(options: PrependOptions = PrependOptions.prependOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
+    ScalaCouchbaseFlow.prepend(options).asJava
+
+  /**
+   * Increments the counter document by one or the number defined in the 
options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#increment]]
+   */
+  def increment(options: IncrementOptions = 
IncrementOptions.incrementOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[String, CounterResult, 
NotUsed] =
+    ScalaCouchbaseFlow.increment(options).asJava
+
+  /**
+   * Decrements the counter document by one or the number defined in the 
options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#decrement]]
+   */
+  def decrement(options: DecrementOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, CounterResult, 
NotUsed] =
+    ScalaCouchbaseFlow.decrement(options).asJava
+
+}
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
new file mode 100644
index 000000000..30d3b695b
--- /dev/null
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.javadsl
+
+import com.couchbase.client.java.AsyncCollection
+import com.couchbase.client.java.kv._
+import org.apache.pekko.stream.connectors.couchbase3.MutationDocument
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseSink 
=> ScalaCouchbaseSink }
+import org.apache.pekko.stream.javadsl.Sink
+
+import scala.concurrent.Future
+
+object CouchbaseSink {
+
+  /**
+   * reference to [[CouchbaseFlow.insertDoc]]
+   */
+  def insertDoc[T](insertOptions: InsertOptions)(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
+    ScalaCouchbaseSink.insertDoc[T](insertOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsertDoc]]
+   */
+  def insert[T](applyId: T => String,
+      insertOptions: InsertOptions)(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+    ScalaCouchbaseSink.insert[T](applyId, insertOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsertDoc]]
+   */
+  def upsertDoc[T](upsertOptions: UpsertOptions = 
UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
+    ScalaCouchbaseSink.upsertDoc[T](upsertOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.upsert]]
+   */
+  def upsert[T](applyId: T => String,
+      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+    ScalaCouchbaseSink.upsert[T](applyId, upsertOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.replaceDoc]]
+   */
+  def replaceDoc[T](
+      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
+    ScalaCouchbaseSink.replaceDoc[T](replaceOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.replace]]
+   */
+  def replace[T](applyId: T => String,
+      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] = {
+    ScalaCouchbaseSink.replace[T](applyId, replaceOptions).asJava
+  }
+
+  /**
+   * reference to [[CouchbaseFlow.remove]]
+   */
+  def remove[T](applyId: T => String,
+      removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+    ScalaCouchbaseSink.remove[T](applyId, removeOptions).asJava
+
+  /**
+   * reference to [[CouchbaseFlow.exists]]
+   */
+  def exists[T](applyId: T => String, existsOptions: ExistsOptions = 
ExistsOptions.existsOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Boolean]] =
+    ScalaCouchbaseSink.exists[T](applyId, existsOptions).asJava
+
+}
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSource.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSource.scala
new file mode 100644
index 000000000..34c98df69
--- /dev/null
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSource.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.javadsl
+
+import com.couchbase.client.java.{ AsyncCollection, AsyncScope }
+import com.couchbase.client.java.analytics.{ AnalyticsOptions, AnalyticsResult 
}
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.json.JsonObject
+import com.couchbase.client.java.kv._
+import com.couchbase.client.java.manager.query.{ GetAllQueryIndexesOptions, 
QueryIndex }
+import com.couchbase.client.java.query.{ QueryOptions, QueryResult }
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.javadsl.Source
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ 
CouchbaseSource => ScalaCoubaseSource }
+
+object CouchbaseSource {
+
+  /**
+   * get a document by id from Couchbase collection
+   * @param id document id
+   * @param options reference to Couchbase options doc
+   */
+  def get(id: String, options: GetOptions = GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Source[GetResult, NotUsed] =
+    ScalaCoubaseSource.get(id, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.get]] deserialize to Couchbase JsonObject
+   */
+  def getJson(id: String, options: GetOptions = GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+    ScalaCoubaseSource.getJson(id, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.get]],deserialize to class
+   */
+  def getObject[T](id: String, target: Class[T], options: GetOptions = 
GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    ScalaCoubaseSource.getObject[T](id, target, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.getObject]],deserialize to class with 
Generics
+   */
+  def getType[T](id: String, target: TypeRef[T], options: GetOptions = 
GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] = {
+    ScalaCoubaseSource.getType[T](id, target, options).asJava
+  }
+
+  /**
+   * similar to [[CouchbaseSource.get]] .Reads from replicas or the active 
node based on the options and returns the results as a list
+   * @param options reference to Couchbase options doc
+   * @see [[CouchbaseSource#get]]
+   */
+  def getAllReplicas(id: String, options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Source[GetReplicaResult, 
NotUsed] =
+    ScalaCoubaseSource.getAllReplicas(id, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.getAllReplicas]], deserialize to Couchbase 
JsonObject
+   */
+  def getAllReplicasJson(id: String, options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+    ScalaCoubaseSource.getAllReplicasJson(id, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.getAllReplicas]], deserialize to class
+   * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
+   */
+  def getAllReplicasObject[T](id: String, target: Class[T],
+      options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    ScalaCoubaseSource.getAllReplicasObject[T](id, target, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.getAllReplicasObject]], deserialize to 
class with Generics
+   */
+  def getAllReplicasType[T](id: String, target: TypeRef[T],
+      options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    ScalaCoubaseSource.getAllReplicasType[T](id, target, options).asJava
+
+  /**
+   * similar to Get[[CouchbaseSource.get]], batch get documents from 
collection by ScanType[[ScanType]]
+   */
+  def scan(scanType: ScanType, options: ScanOptions = 
ScanOptions.scanOptions())(
+      implicit asyncCollection: AsyncCollection): Source[ScanResult, NotUsed] =
+    ScalaCoubaseSource.scan(scanType, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.scan]], deserialize to Couchbase JsonObject
+   */
+  def scanJson(scanType: ScanType, options: ScanOptions = 
ScanOptions.scanOptions())(
+      implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+    ScalaCoubaseSource.scanJson(scanType, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.scan]], deserialize to class
+   * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
+   */
+  def scanObject[T](scanType: ScanType, target: Class[T],
+      options: ScanOptions = ScanOptions.scanOptions())(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    ScalaCoubaseSource.scanObject[T](scanType, target, options).asJava
+
+  /**
+   * reference to [[CouchbaseSource.scan]], [[CouchbaseSource.scanObject]], 
deserialize to class with Generics
+   */
+  def scanType[T](scanType: ScanType, target: TypeRef[T],
+      options: ScanOptions = ScanOptions.scanOptions())(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    ScalaCoubaseSource.scanType[T](scanType, target, options).asJava
+
+  /**
+   * N1QL query in a Scope.
+   *
+   * QueryResult contains List<Row>, every Row is a json like
+   * <p>
+   * <pre>
+   *   {
+   *     "collectionName": Document
+   *   }
+   * </pre>
+   * </p>
+   */
+  def query(statement: String, options: QueryOptions = 
QueryOptions.queryOptions())(
+      implicit scope: AsyncScope): Source[QueryResult, NotUsed] =
+    ScalaCoubaseSource.query(statement, options).asJava
+
+  /**
+   * N1QL query in a Scope.
+   * @param statement N1QL query sql
+   * @see [[CouchbaseSource.query]]
+   */
+  def queryJson(statement: String, options: QueryOptions = 
QueryOptions.queryOptions())(
+      implicit scope: AsyncScope): Source[JsonObject, NotUsed] =
+    ScalaCoubaseSource.queryJson(statement, options).asJava
+
+  /**
+   * Performs an Analytics query , QueryResult contains List<Row>
+   * <p>
+   * val rows: List[JsonObject] = QueryResult.rowsAsObject, every Row is a 
json with CollectionName key
+   * <pre>
+   *    {
+   *      "collectionName": Document
+   *    }
+   * </pre>
+   * </p>
+   * <p>
+   *  Warning: couchbase-community not support analyticsQuery, we not test 
this API
+   *  https://www.couchbase.com/products/editions/server/
+   * </p>
+   *
+   * @param statement Analytics query sql
+   */
+  def analyticsQuery(statement: String, options: AnalyticsOptions = 
AnalyticsOptions.analyticsOptions())(
+      implicit scope: AsyncScope): Source[AnalyticsResult, NotUsed] =
+    ScalaCoubaseSource.analyticsQuery(statement, options).asJava
+
+  /**
+   * <p>
+   *  Warning: couchbase-community not support analyticsQuery, we not test 
this API
+   * </p>
+   *
+   * Performs an Analytics query and convert document row to jsonObject <br>
+   * different with analyticsQuery, jsonObject not has the collection Key, but 
is Document directly<br>
+   * @see [[CouchbaseSource.analyticsQuery]]
+   */
+  def analyticsQueryJson(statement: String, options: AnalyticsOptions = 
AnalyticsOptions.analyticsOptions())(
+      implicit scope: AsyncScope): Source[JsonObject, NotUsed] =
+    ScalaCoubaseSource.analyticsQueryJson(statement, options).asJava
+
+  /**
+   * Fetches all indexes from this collection with custom options.
+   * @see 
[[com.couchbase.client.java.manager.query.AsyncCollectionQueryIndexManager#getAllIndexes]]
+   */
+  def queryAllIndex(options: GetAllQueryIndexesOptions = 
GetAllQueryIndexesOptions.getAllQueryIndexesOptions)(
+      implicit asyncCollection: AsyncCollection): Source[QueryIndex, NotUsed] =
+    ScalaCoubaseSource.queryAllIndex(options).asJava
+}
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseFlow.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseFlow.scala
new file mode 100644
index 000000000..3fb798cc8
--- /dev/null
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseFlow.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.scaladsl
+
+import com.couchbase.client.java.AsyncCollection
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.json.JsonObject
+import com.couchbase.client.java.kv._
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.connectors.couchbase3.{ MutationBinaryDocument, 
MutationDocument }
+import org.apache.pekko.stream.scaladsl.{ Flow, Source }
+
+import java.time.{ Duration, Instant }
+
+object CouchbaseFlow {
+
+  /**
+   * get a document by id from Couchbase collection
+   * @param options reference to Couchbase options doc
+   */
+  def get(options: GetOptions = GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, GetResult, 
NotUsed] =
+    Flow[String].flatMapConcat(CouchbaseSource.get(_, options))
+
+  /**
+   * reference to [[CouchbaseFlow.get]] deserialize to Couchbase JsonObject
+   */
+  def getJson(options: GetOptions = GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, JsonObject, 
NotUsed] =
+    get(options).map(_.contentAsObject())
+
+  /**
+   * reference to [[CouchbaseFlow.get]],deserialize to class
+   * If you add DefaultScalaModule to jackson of couchbase, it could 
deserialize to scala class
+   */
+  def getObject[T](target: Class[T], options: GetOptions = 
GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    Flow[String]
+      .flatMapConcat(CouchbaseSource.getObject(_, target, options))
+
+  /**
+   * reference to [[CouchbaseSource.getObject]],deserialize to class with 
Generics
+   */
+  def getType[T](target: TypeRef[T], options: GetOptions = 
GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    Flow[String]
+      .flatMapConcat(CouchbaseSource.getType(_, target, options))
+
+  /**
+   * same to Get option, but reads from all replicas on the active node
+   * @see [[CouchbaseFlow#get]]
+   */
+  def getAllReplicas(options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, 
GetReplicaResult, NotUsed] =
+    Flow[String]
+      .flatMapConcat(CouchbaseSource.getAllReplicas(_, options))
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to Couchbase 
JsonObject
+   */
+  def getAllReplicasJson(options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, JsonObject, 
NotUsed] =
+    getAllReplicas(options)
+      .map(_.contentAsObject())
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to class
+   */
+  def getAllReplicasObject[T](target: Class[T],
+      getOptions: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    Flow[String]
+      .flatMapConcat(CouchbaseSource.getAllReplicasObject(_, target, 
getOptions))
+
+  /**
+   * reference to [[CouchbaseFlow.getAllReplicasObject]], deserialize to class 
with Generics
+   */
+  def getAllReplicasType[T](target: TypeRef[T],
+      getOptions: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+    Flow[String]
+      .flatMapConcat(CouchbaseSource.getAllReplicasType(_, target, getOptions))
+
+  /**
+   * Inserts a full document which does not exist yet with custom options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#insert]]
+   */
+  def insert[T](applyId: T => String,
+      insertOptions: InsertOptions = InsertOptions.insertOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] = {
+    Flow[T]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.insert(applyId(doc), doc, 
insertOptions))
+          .map(_ => doc)
+      }
+  }
+
+  /**
+   * reference to [[CouchbaseFlow.insert]] <br>
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def insertDoc[T](insertOptions: InsertOptions = 
InsertOptions.insertOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    Flow[MutationDocument[T]]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.insert(doc.id, doc.doc, 
insertOptions))
+          .map(doc.withResult)
+      }
+
+  /**
+   * Replaces a full document which already exists with custom options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#replace]]
+   */
+  def replace[T](applyId: T => String,
+      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    Flow[T]
+      .flatMapConcat { content =>
+        Source.completionStage(asyncCollection.replace(applyId(content), 
content, replaceOptions))
+          .map(_ => content)
+      }
+
+  /**
+   * reference to [[CouchbaseFlow.replace]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def replaceDoc[T](replaceOptions: ReplaceOptions = 
ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    Flow[MutationDocument[T]]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.replace(doc.id, doc.doc, 
replaceOptions))
+          .map(doc.withResult)
+      }
+
+  /**
+   * Upsert a full document which might or might not exist yet with custom 
options.
+   * @param applyId parse id function, which is the document id
+   * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
+   */
+  def upsert[T](applyId: T => String,
+      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    Flow[T]
+      .flatMapConcat { content =>
+        Source.completionStage(asyncCollection.upsert(applyId(content), 
content, upsertOptions))
+          .map(_ => content)
+      }
+
+  /**
+   * reference to [[CouchbaseFlow.upsert]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   */
+  def upsertDoc[T](
+      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    Flow[MutationDocument[T]]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.upsert(doc.id, doc.doc, 
upsertOptions))
+          .map(doc.withResult)
+      }
+
+  /**
+   * Removes a Document from a collection with custom options.
+   * @param applyId parse id function, which is the document id, id streams 
can use `remove[String](e => e)`
+   * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+   */
+  def remove[T](
+      applyId: T => String,
+      removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    Flow[T]
+      .flatMapConcat { doc =>
+        Source
+          .completionStage(asyncCollection.remove(applyId(doc), removeOptions))
+          .map(_ => doc)
+      }
+
+  /**
+   * Performs mutations to document fragments with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#mutateIn]]
+   */
+  def mutateIn(specs: java.util.List[MutateInSpec], options: MutateInOptions = 
MutateInOptions.mutateInOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[String, MutateInResult, 
NotUsed] =
+    Flow[String]
+      .flatMapConcat { id =>
+        Source.completionStage(asyncCollection.mutateIn(id, specs, options))
+      }
+
+  /**
+   * reference to [[CouchbaseFlow.mutateIn]]
+   * use MutationDocument to wrapper id, document and result(MutationResult)
+   * @return
+   */
+  def mutateInDoc[T](specs: java.util.List[MutateInSpec], options: 
MutateInOptions = MutateInOptions.mutateInOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T], 
MutationDocument[T], NotUsed] =
+    Flow[MutationDocument[T]]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.mutateIn(doc.id, specs, 
options))
+          .map(doc.withResult)
+      }
+
+  /**
+   * Checks if the given document ID exists on the active partition with 
custom options.
+   * @param applyId parse id function, which is the document id, id streams 
can use `exists[String](e => e)`
+   * @see [[com.couchbase.client.java.AsyncCollection#exists]]
+   */
+  def exists[T](applyId: T => String, existsOptions: ExistsOptions = 
ExistsOptions.existsOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
+    Flow[T]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.exists(applyId(doc), 
existsOptions))
+          .map(_.exists())
+      }
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+   */
+  def touch(expiry: Duration, options: TouchOptions = 
TouchOptions.touchOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[String, MutationResult, 
NotUsed] =
+    Flow[String]
+      .flatMapConcat { id =>
+        Source.completionStage(asyncCollection.touch(id, expiry, options))
+      }
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @param applyId parse id function, which is the document id
+   */
+  def touchDuration[T](applyId: T => String, expiry: Duration,
+      touchOptions: TouchOptions = TouchOptions.touchOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    Flow[T]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.touch(applyId(doc), expiry, 
touchOptions))
+          .map(_ => doc)
+      }
+
+  /**
+   * Updates the expiry of the document with the given id with custom options.
+   * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+   */
+  def touchInstant[T](applyId: T => String, expiry: Instant,
+      touchOptions: TouchOptions = TouchOptions.touchOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+    Flow[T]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.touch(applyId(doc), expiry, 
touchOptions))
+          .map(_ => doc)
+      }
+
+  /**
+   * Appends binary content to the document with custom options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#append]]
+   */
+  def append(options: AppendOptions = AppendOptions.appendOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
+    Flow[MutationBinaryDocument]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.binary().append(doc.id, 
doc.doc, options))
+      }
+
+  /**
+   * Prepends binary content to the document with custom options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#prepend]]
+   */
+  def prepend(options: PrependOptions = PrependOptions.prependOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument, 
MutationResult, NotUsed] =
+    Flow[MutationBinaryDocument]
+      .flatMapConcat { doc =>
+        Source.completionStage(asyncCollection.binary().prepend(doc.id, 
doc.doc, options))
+
+      }
+
+  /**
+   * Increments the counter document by one or the number defined in the 
options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#increment]]
+   */
+  def increment(options: IncrementOptions = 
IncrementOptions.incrementOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[String, CounterResult, 
NotUsed] =
+    Flow[String]
+      .flatMapConcat { id =>
+        Source.completionStage(asyncCollection.binary().increment(id, options))
+      }
+
+  /**
+   * Decrements the counter document by one or the number defined in the 
options.
+   * @see [[com.couchbase.client.java.AsyncBinaryCollection#decrement]]
+   */
+  def decrement(options: DecrementOptions = 
DecrementOptions.decrementOptions())(
+      implicit asyncCollection: AsyncCollection): Flow[String, CounterResult, 
NotUsed] =
+    Flow[String]
+      .flatMapConcat { id =>
+        Source.completionStage(asyncCollection.binary().decrement(id, options))
+      }
+
+}
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSink.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSink.scala
new file mode 100644
index 000000000..a907cd553
--- /dev/null
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSink.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.scaladsl
+
+import com.couchbase.client.java.AsyncCollection
+import com.couchbase.client.java.kv.{ ExistsOptions, InsertOptions, 
RemoveOptions, ReplaceOptions, UpsertOptions }
+import org.apache.pekko.stream.connectors.couchbase3.MutationDocument
+import org.apache.pekko.stream.scaladsl.{ Keep, Sink }
+import org.apache.pekko.Done
+
+import scala.concurrent.Future
+
+object CouchbaseSink {
+
+  /**
+   * reference to [[CouchbaseFlow.insertDoc]]
+   */
+  def insertDoc[T](insertOptions: InsertOptions = 
InsertOptions.insertOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
+    CouchbaseFlow.insertDoc[T](insertOptions).toMat(Sink.ignore)(Keep.right)
+
+  /**
+   * reference to [[CouchbaseFlow.upsertDoc]]
+   */
+  def insert[T](applyId: T => String,
+      insertOptions: InsertOptions = InsertOptions.insertOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+    CouchbaseFlow.insert[T](applyId, 
insertOptions).toMat(Sink.ignore)(Keep.right)
+
+  /**
+   * reference to [[CouchbaseFlow.upsertDoc]]
+   */
+  def upsertDoc[T](upsertOptions: UpsertOptions = 
UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
+    CouchbaseFlow.upsertDoc[T](upsertOptions).toMat(Sink.ignore)(Keep.right)
+
+  /**
+   * reference to [[CouchbaseFlow.upsert]]
+   */
+  def upsert[T](applyId: T => String,
+      upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+    CouchbaseFlow.upsert[T](applyId, 
upsertOptions).toMat(Sink.ignore)(Keep.right)
+
+  /**
+   * reference to [[CouchbaseFlow.replaceDoc]]
+   */
+  def replaceDoc[T](
+      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T], 
Future[Done]] =
+    CouchbaseFlow.replaceDoc[T](replaceOptions).toMat(Sink.ignore)(Keep.right)
+
+  /**
+   * reference to [[CouchbaseFlow.replace]]
+   */
+  def replace[T](applyId: T => String,
+      replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+    CouchbaseFlow.replace[T](applyId, 
replaceOptions).toMat(Sink.ignore)(Keep.right)
+
+  /**
+   * reference to [[CouchbaseFlow.remove]]
+   */
+  def remove[T](applyId: T => String,
+      removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+    CouchbaseFlow.remove(applyId, removeOptions).toMat(Sink.ignore)(Keep.right)
+
+  /**
+   * reference to [[CouchbaseFlow.exists]]
+   */
+  def exists[T](applyId: T => String, existsOptions: ExistsOptions = 
ExistsOptions.existsOptions())(
+      implicit asyncCollection: AsyncCollection): Sink[T, Future[Boolean]] =
+    CouchbaseFlow.exists(applyId, existsOptions).toMat(Sink.head)(Keep.right)
+
+}
diff --git 
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSource.scala
 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSource.scala
new file mode 100644
index 000000000..17432e7a6
--- /dev/null
+++ 
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSource.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.scaladsl
+
+import com.couchbase.client.java.{ AsyncCollection, AsyncScope }
+import com.couchbase.client.java.analytics.{ AnalyticsOptions, AnalyticsResult 
}
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.json.JsonObject
+import com.couchbase.client.java.kv._
+import com.couchbase.client.java.manager.query.{ GetAllQueryIndexesOptions, 
QueryIndex }
+import com.couchbase.client.java.query.{ QueryOptions, QueryResult }
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.scaladsl.Source
+
+object CouchbaseSource {
+
+  /**
+   * get a document by id from Couchbase collection
+   * @param id document id
+   * @param options reference to Couchbase options doc
+   */
+  def get(id: String, options: GetOptions = GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Source[GetResult, NotUsed] =
+    Source.completionStage(asyncCollection.get(id, options))
+
+  /**
+   * reference to [[CouchbaseSource.get]] deserialize to Couchbase JsonObject
+   */
+  def getJson(id: String, options: GetOptions = GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+    get(id, options).map(_.contentAsObject())
+
+  /**
+   * reference to [[CouchbaseSource.get]],deserialize to class
+   */
+  def getObject[T](id: String, target: Class[T], options: GetOptions = 
GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    get(id, options).map(_.contentAs(target))
+
+  /**
+   * reference to [[CouchbaseSource.getObject]],deserialize to class with 
Generics
+   */
+  def getType[T](id: String, target: TypeRef[T], options: GetOptions = 
GetOptions.getOptions)(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] = {
+    get(id, options).map(_.contentAs(target))
+  }
+
+  /**
+   * similar to get(id, options) .Reads from replicas or the active node based 
on the options and returns the results as a list
+   * @param options reference to Couchbase options doc
+   * @see [[CouchbaseSource#get]]
+   */
+  def getAllReplicas(id: String, options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Source[GetReplicaResult, 
NotUsed] =
+    Source
+      .completionStage(asyncCollection.getAllReplicas(id, options))
+      .flatMapConcat { res =>
+        Source.fromJavaStream(() => res.stream())
+          .flatMapConcat(Source.completionStage)
+      }
+
+  /**
+   * reference to [[CouchbaseSource.getAllReplicas]], deserialize to Couchbase 
JsonObject
+   */
+  def getAllReplicasJson(id: String, options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+    getAllReplicas(id, options)
+      .map(_.contentAsObject())
+
+  /**
+   * reference to [[CouchbaseSource.getAllReplicas]], deserialize to class
+   */
+  def getAllReplicasObject[T](id: String, target: Class[T],
+      options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    getAllReplicas(id, options)
+      .map(_.contentAs(target))
+
+  /**
+   * reference to [[CouchbaseSource.getAllReplicasObject]], deserialize to 
class with Generics
+   */
+  def getAllReplicasType[T](id: String, target: TypeRef[T],
+      options: GetAllReplicasOptions = 
GetAllReplicasOptions.getAllReplicasOptions)(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    getAllReplicas(id, options)
+      .map(_.contentAs(target))
+
+  /**
+   * similar to Get[[CouchbaseSource.get]], batch get documents from 
collection by ScanType[[ScanType]]
+   */
+  def scan(scanType: ScanType, options: ScanOptions = 
ScanOptions.scanOptions())(
+      implicit asyncCollection: AsyncCollection): Source[ScanResult, NotUsed] =
+    Source
+      .completionStage(asyncCollection.scan(scanType, options))
+      .flatMapConcat(res => Source.fromJavaStream(() => res.stream()))
+
+  /**
+   * reference to [[CouchbaseSource.scan]], deserialize to Couchbase JsonObject
+   */
+  def scanJson(scanType: ScanType, options: ScanOptions = 
ScanOptions.scanOptions())(
+      implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+    scan(scanType, options)
+      .map(_.contentAsObject())
+
+  /**
+   * reference to [[CouchbaseSource.scan]], deserialize to class
+   */
+  def scanObject[T](scanType: ScanType, target: Class[T],
+      options: ScanOptions = ScanOptions.scanOptions())(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    scan(scanType, options)
+      .map(_.contentAs(target))
+
+  /**
+   * reference to [[CouchbaseSource.scan]], [[CouchbaseSource.scanObject]], 
deserialize to class with Generics
+   */
+  def scanType[T](scanType: ScanType, target: TypeRef[T],
+      options: ScanOptions = ScanOptions.scanOptions())(
+      implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+    scan(scanType, options)
+      .map(_.contentAs(target))
+
+  /**
+   * N1QL query in a Scope.
+   *
+   * QueryResult contains List<Row>, every Row is a json like
+   * <p>
+   * <pre>
+   *   {
+   *     "collectionName": Document
+   *   }
+   * </pre>
+   * </p>
+   */
+  def query(statement: String, options: QueryOptions = 
QueryOptions.queryOptions())(
+      implicit scope: AsyncScope): Source[QueryResult, NotUsed] = {
+    Source
+      .completionStage(scope.query(statement, options))
+  }
+
+  /**
+   * N1QL query in a Scope.
+   * @param statement N1QL query sql
+   * @see [[CouchbaseSource.query]]
+   */
+  def queryJson(
+      statement: String,
+      options: QueryOptions = QueryOptions.queryOptions())(
+      implicit scope: AsyncScope): Source[JsonObject, NotUsed] = {
+    query(statement, options)
+      .flatMapConcat { res =>
+        Source.fromJavaStream(() =>
+          res.rowsAsObject().stream.flatMap[JsonObject] { json =>
+            json.getNames.stream().map(collection => 
json.getObject(collection))
+          })
+      }
+  }
+
+  /**
+   * Performs an Analytics query , QueryResult contains List<Row>
+   * <p>
+   * val rows: List[JsonObject] = QueryResult.rowsAsObject, every Row is a 
json with CollectionName key
+   * <pre>
+   *    {
+   *      "collectionName": Document
+   *    }
+   * </pre>
+   * </p>
+   * <p>
+   *  Warning: couchbase-community not support analyticsQuery, we not test 
this API
+   *  https://www.couchbase.com/products/editions/server/
+   * </p>
+   *
+   * @param statement Analytics query sql
+   */
+  def analyticsQuery(statement: String,
+      options: AnalyticsOptions = AnalyticsOptions.analyticsOptions())(
+      implicit scope: AsyncScope): Source[AnalyticsResult, NotUsed] =
+    Source
+      .completionStage(scope.analyticsQuery(statement, options))
+
+  /**
+   * <p>
+   *  Warning: couchbase-community not support analyticsQuery, we not test 
this API
+   * </p>
+   *
+   * Performs an Analytics query and convert document row to jsonObject <br>
+   * different with analyticsQuery, jsonObject not has the collection Key, but 
is Document directly<br>
+   * @see [[CouchbaseSource.analyticsQuery]]
+   */
+  def analyticsQueryJson(statement: String,
+      options: AnalyticsOptions = AnalyticsOptions.analyticsOptions())(
+      implicit scope: AsyncScope): Source[JsonObject, NotUsed] =
+    analyticsQuery(statement, options)
+      .flatMapConcat { res =>
+        Source.fromJavaStream(() =>
+          res.rowsAsObject().stream().flatMap[JsonObject] { json =>
+            json.getNames.stream().map(collection => 
json.getObject(collection))
+          })
+      }
+
+  /**
+   * Fetches all indexes from this collection with custom options.
+   * @see 
[[com.couchbase.client.java.manager.query.AsyncCollectionQueryIndexManager#getAllIndexes]]
+   */
+  def queryAllIndex(options: GetAllQueryIndexesOptions = 
GetAllQueryIndexesOptions.getAllQueryIndexesOptions)(
+      implicit asyncCollection: AsyncCollection): Source[QueryIndex, NotUsed] =
+    Source
+      .completionStage(asyncCollection.queryIndexes().getAllIndexes(options))
+      .flatMapConcat(list => Source.fromJavaStream(() => list.stream()))
+}
diff --git a/couchbase3/src/test/resources/application.conf 
b/couchbase3/src/test/resources/application.conf
new file mode 100644
index 000000000..a1d8c93dd
--- /dev/null
+++ b/couchbase3/src/test/resources/application.conf
@@ -0,0 +1,7 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko {
+  loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+  logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
+  loglevel = "DEBUG"
+}
diff --git a/couchbase3/src/test/resources/logback-test.xml 
b/couchbase3/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..8b7b58047
--- /dev/null
+++ b/couchbase3/src/test/resources/logback-test.xml
@@ -0,0 +1,31 @@
+<configuration>
+    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+        <file>target/couchbase.log</file>
+        <append>false</append>
+        <encoder>
+            <pattern>%d{ISO8601} %-5level [%thread] [%logger{36}]  
%msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} %-5level [%-20.20thread] 
%-36.36logger{36}  %msg%n%rEx</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="CapturingAppender" 
class="org.apache.pekko.stream.connectors.testkit.CapturingAppender"/>
+
+    <logger 
name="org.apache.pekko.stream.connectors.testkit.CapturingAppenderDelegate">
+        <appender-ref ref="STDOUT"/>
+    </logger>
+
+    <logger name="org.apache.pekko" level="DEBUG"/>
+    <logger name="org.apache.pekko.http.impl" level="WARN"/>
+    <logger name="org.apache.pekko.stream.connectors" level="DEBUG"/>
+    <logger name="com.couchbase" level="INFO"/>
+
+    <root level="debug">
+        <appender-ref ref="CapturingAppender"/>
+        <appender-ref ref="FILE" />
+    </root>
+</configuration>
diff --git 
a/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestFlowSpec.scala 
b/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestFlowSpec.scala
new file mode 100644
index 000000000..a50d90a59
--- /dev/null
+++ b/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestFlowSpec.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.scaladsl
+
+import com.couchbase.client.java.kv.{ DecrementOptions, IncrementOptions, 
MutateInSpec, ReplaceOptions }
+import com.couchbase.client.java.AsyncCollection
+import org.apache.pekko.stream.connectors.couchbase3.{ CouchbaseTestSupport, 
Document }
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseFlow, 
CouchbaseSink, CouchbaseSource }
+import org.apache.pekko.stream.connectors.testkit.scaladsl.LogCapturing
+import org.apache.pekko.stream.scaladsl.{ Flow, Sink, Source }
+import 
org.apache.pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
+import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Inspectors }
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+import java.time.Duration
+import java.util.Collections
+import scala.collection.immutable.Seq
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.util.Random
+
+class CouchbaseTestFlowSpec extends AnyWordSpec
+    with CouchbaseTestSupport
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with Matchers
+    with ScalaFutures
+    with Inspectors
+    with LogCapturing {
+
+  private implicit val collection: AsyncCollection = simpleContext.collection
+
+  override protected def beforeAll(): Unit = {
+    mockData(simpleContext)
+  }
+
+  override protected def afterAll(): Unit = {
+    clearData(simpleContext)
+  }
+
+  "insert-get-remove document" in assertAllStagesStopped {
+    val data = Seq[Document](document.copy(id = s"$docId-1"), document.copy(id 
= s"$docId-2"))
+    // insert => get => remove, because we add one,we need clear it
+    val insertFlow = CouchbaseFlow.insert[Document](_.id)
+    val getFlow = CouchbaseFlow.getObject(classOf[Document])
+    val removeFlow = CouchbaseFlow.remove[Document](_.id)
+    val insertFuture = Source(data)
+      .via(insertFlow)
+      .map(_.id)
+      .via(getFlow)
+      .via(removeFlow)
+      .runWith(Sink.seq)
+
+    insertFuture.futureValue shouldBe data
+  }
+
+  "upsert-get document" in assertAllStagesStopped {
+    val upsertDocument = document.copy(value = document.value + "-upsert")
+    val upsertFlow = CouchbaseFlow.upsert[Document](_.id)
+    val getFlow = CouchbaseFlow.getObject[Document](classOf[Document])
+    val upsertFuture = Source.single[Document](upsertDocument)
+      .via(upsertFlow)
+      .map(_.id)
+      .via(getFlow)
+      .runWith(Sink.head)
+    upsertFuture.futureValue shouldBe upsertDocument
+  }
+
+  "replace data" in assertAllStagesStopped {
+    val replaceDocument = document.copy(value = document.value + "-upsert")
+    // replace with cas, last remove
+    val replaceFlow = CouchbaseSource.get(replaceDocument.id)
+      .map(_.cas())
+      .map(ReplaceOptions.replaceOptions().cas(_))
+      .map { e =>
+        CouchbaseFlow.replace[Document](_.id, e)
+      }.runWith(Sink.head)
+
+    val future = Source.single(replaceDocument)
+      .via(Flow.futureFlow(replaceFlow))
+      .map(_.id)
+      .via(CouchbaseFlow.getObject(classOf[Document]))
+      .runWith(Sink.head)
+
+    future.futureValue shouldBe replaceDocument
+  }
+
+  "check exists" in assertAllStagesStopped {
+    val existsFlow = CouchbaseFlow.exists[Document](_.id)
+    val existFuture = Source.single(document)
+      .via(existsFlow)
+      .runWith(Sink.head)
+    existFuture.futureValue shouldBe true
+  }
+
+  "touch 1s, wait 2s and check exists" in assertAllStagesStopped {
+    val touchDocument = document.copy(id = document.id + "-touch")
+    // touch 1s, sleep 1s, check exists = false
+    val insertFuture = Source.single(touchDocument)
+      .runWith(CouchbaseSink.insert[Document](_.id))
+    Await.result(insertFuture, 10.seconds)
+    val touchFlow = CouchbaseFlow.touchDuration[Document](_.id, 
Duration.ofSeconds(1))
+    val touchFuture = Source.single(touchDocument)
+      .via(touchFlow)
+      .runWith(Sink.ignore)
+    Await.result(touchFuture, 10.seconds)
+    // wait the doc was deleted by couchbase
+    Thread.sleep(2000)
+    val future = 
Source.single(touchDocument.id).runWith(CouchbaseSink.exists[String](e => e))
+    future.futureValue shouldBe false
+  }
+
+  "mutateIn data: add field to JsonDoc" in assertAllStagesStopped {
+    // scala users can use convert. We support 2.12,so use 
java.util.Collections
+    val insertOptions: java.util.List[MutateInSpec] =
+      Collections.singletonList(MutateInSpec.insert("mutate", "mutate"))
+    val mutateInFlow = CouchbaseFlow.mutateIn(insertOptions)
+    val mutateFuture = Source.single(jsonId)
+      .via(mutateInFlow)
+      .runWith(Sink.head)
+    Await.result(mutateFuture, 10.seconds)
+    val future = CouchbaseSource.getJson(jsonId).runWith(Sink.head)
+    future.futureValue.getString("mutate") shouldBe "mutate"
+  }
+
+  "increment and decrement on Number document" in assertAllStagesStopped {
+    val id = Random.nextString(10)
+    registerBeClear.add(id)
+    val value: Long = 0
+    val increment = 10
+    val insertNum = Source.single(value).runWith(CouchbaseSink.insert[Long](_ 
=> id))
+    Await.result(insertNum, 10.seconds)
+    val incrementFlow = 
CouchbaseFlow.increment(IncrementOptions.incrementOptions().delta(increment))
+    val incrementFuture = Source.single(id)
+      .via(incrementFlow)
+      .runWith(Sink.head)
+
+    incrementFuture.futureValue.content() shouldBe (value + increment)
+    val decrement: Long = 5
+    val decrementFlow = 
CouchbaseFlow.decrement(DecrementOptions.decrementOptions().delta(decrement))
+    val decrementFuture = Source.single(id)
+      .via(decrementFlow)
+      .runWith(Sink.head)
+
+    decrementFuture.futureValue.content() shouldBe (value + increment - 
decrement)
+  }
+
+}
diff --git 
a/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestSourceSpec.scala 
b/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestSourceSpec.scala
new file mode 100644
index 000000000..7697a1540
--- /dev/null
+++ b/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestSourceSpec.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.scaladsl
+
+import com.couchbase.client.java.{ AsyncCollection, AsyncScope }
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.kv.ScanType
+import org.apache.pekko.stream.connectors.couchbase3.{ CouchbaseTestSupport, 
Document, TypeDocument }
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.CouchbaseSource
+import org.apache.pekko.stream.connectors.testkit.scaladsl.LogCapturing
+import org.apache.pekko.stream.scaladsl.Sink
+import 
org.apache.pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
+import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Inspectors }
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class CouchbaseTestSourceSpec extends AnyWordSpec
+    with CouchbaseTestSupport
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with Matchers
+    with ScalaFutures
+    with Inspectors
+    with LogCapturing {
+
+  private implicit val collection: AsyncCollection = simpleContext.collection
+  private implicit val scope: AsyncScope = simpleContext.scope
+
+  override protected def beforeAll(): Unit = {
+    mockData(simpleContext)
+  }
+
+  override protected def afterAll(): Unit = {
+    clearData(simpleContext)
+  }
+
+  "get document by jsonObject" in assertAllStagesStopped {
+    val future = CouchbaseSource.getJson(jsonId)
+      .runWith(Sink.head)
+    future.futureValue.getString("id") shouldBe jsonId
+  }
+
+  "get document by scala case class" in assertAllStagesStopped {
+    val future = CouchbaseSource.getObject(docId, classOf[Document])
+      .runWith(Sink.head)
+    val futureValue = future.futureValue
+    futureValue shouldBe document
+  }
+
+  "get document class with type" in assertAllStagesStopped {
+    val docType = new TypeRef[TypeDocument[String]] {}
+    val future = CouchbaseSource.getType(typeId, docType)
+      .runWith(Sink.head)
+    val futureValue = future.futureValue
+    futureValue shouldBe typeDocument
+  }
+
+  "scan document" in assertAllStagesStopped {
+    val future = 
CouchbaseSource.scan(ScanType.samplingScan(1)).runWith(Sink.head)
+    idSet.contains(future.futureValue.id()) shouldBe true
+  }
+
+  "query document by sql++" in assertAllStagesStopped {
+    val sql = s"select * from $defaultCollection"
+    val queryFuture = CouchbaseSource.query(sql).runWith(Sink.head)
+    queryFuture.futureValue.rowsAsObject().size() shouldBe dataSet.size
+  }
+
+  "query json document by sql++" in assertAllStagesStopped {
+    val sql = s"select * from $defaultCollection"
+    val queryFuture = CouchbaseSource.queryJson(sql).runWith(Sink.seq)
+    val queryDocuments = queryFuture.futureValue
+    queryDocuments.size shouldBe dataSet.size
+    queryDocuments.foreach { e =>
+      idSet.contains(e.getString("id")) shouldBe true
+    }
+  }
+
+  "query all indexes" in assertAllStagesStopped {
+    // we have a primary index in afterAll
+    val queryIndexFuture = CouchbaseSource.queryAllIndex().runWith(Sink.seq)
+    val future = queryIndexFuture.futureValue
+    future.foreach(_.primary() shouldBe true)
+    future.size shouldBe 1
+  }
+}
diff --git 
a/couchbase3/src/test/scala/org/apache/pekko/stream/connectors/couchbase3/CouchbaseTestSupport.scala
 
b/couchbase3/src/test/scala/org/apache/pekko/stream/connectors/couchbase3/CouchbaseTestSupport.scala
new file mode 100644
index 000000000..c076efbd8
--- /dev/null
+++ 
b/couchbase3/src/test/scala/org/apache/pekko/stream/connectors/couchbase3/CouchbaseTestSupport.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3
+
+import com.couchbase.client.core.io.CollectionIdentifier
+import com.couchbase.client.java._
+import com.couchbase.client.java.codec.JacksonJsonSerializer
+import com.couchbase.client.java.env.ClusterEnvironment
+import com.couchbase.client.java.json.JsonObject
+import com.fasterxml.jackson.databind.json.JsonMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.pekko.actor.ActorSystem
+import 
org.apache.pekko.stream.connectors.couchbase3.CouchbaseTestSupport.bucketName
+
+import scala.collection.mutable
+
+object CouchbaseTestSupport {
+
+  private val connectionString = "localhost"
+  private val username = "Administrator"
+  private val password = "password"
+  private lazy val bucketName = "pekko"
+
+  private val jacksonMapper = JsonMapper.builder()
+    .addModule(DefaultScalaModule)
+    .build()
+  private val serializer: JacksonJsonSerializer = 
JacksonJsonSerializer.create(jacksonMapper)
+  private val environment = 
ClusterEnvironment.builder().jsonSerializer(serializer).build()
+  lazy val cluster: Cluster = Cluster.connect(connectionString,
+    ClusterOptions.clusterOptions(username, password).environment(environment))
+  lazy val asyncCluster: AsyncCluster = cluster.async()
+}
+
+class SpecContext(bucketName: String) {
+  import CouchbaseTestSupport._
+  lazy val bucket: AsyncBucket = asyncCluster.bucket(bucketName)
+  lazy val scope: AsyncScope = bucket.defaultScope()
+  lazy val collection: AsyncCollection = bucket.defaultCollection()
+  // used for mock and clear data
+  lazy val mock: Collection = 
CouchbaseTestSupport.cluster.bucket(bucketName).defaultCollection()
+
+}
+
+trait CouchbaseTestSupport {
+  implicit val actorSystem: ActorSystem = ActorSystem()
+  val simpleContext = new SpecContext(bucketName)
+  val defaultScope = CollectionIdentifier.fromDefault(bucketName).scope().get()
+  val defaultCollection = 
CollectionIdentifier.fromDefault(bucketName).collection().get()
+
+  val jsonId: String = "pekko-couchbase-json"
+  val docId: String = "pekko-couchbase-doc"
+  val typeId: String = "pekko-couchbase-type"
+  val binaryId: String = "pekko-couchbase-binary"
+
+  val jsonObject: JsonObject = JsonObject.create().put("id", 
jsonId).put("value", jsonId)
+  val document = Document(docId, docId)
+  val typeDocument = TypeDocument[String](typeId, List(typeId))
+  val binaryDocument = BinaryDocument(binaryId, binaryId.getBytes)
+  // idSet dataSet should be associated
+  val idSet = Seq(jsonId, docId, typeId, binaryId)
+  val dataSet = Set(jsonObject, document, typeDocument, binaryDocument)
+
+  val registerBeClear = mutable.Set[String]()
+
+  def mockData(specContext: SpecContext): Unit = {
+    idSet.zip(dataSet).foreach { e =>
+      specContext.mock.insert(e._1, e._2)
+    }
+    simpleContext.mock.queryIndexes().createPrimaryIndex()
+  }
+
+  def clearData(specContext: SpecContext): Unit = {
+    idSet.foreach(specContext.mock.remove)
+    simpleContext.mock.queryIndexes().dropPrimaryIndex()
+    for (elem <- registerBeClear) {
+      try {
+        specContext.mock.remove(elem)
+      } catch {
+        case _: Throwable => // nothing
+      }
+    }
+  }
+}
+
+case class Document(id: String, value: String)
+case class TypeDocument[T](id: String, value: List[T])
+case class BinaryDocument(id: String, value: Array[Byte])
diff --git a/docker-compose.yml b/docker-compose.yml
index 3bfc6873e..cc8c05ca8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -25,46 +25,6 @@ services:
     image: cassandra:4.0
     ports:
       - "9042:9042"
-  couchbase:
-    image: couchbase:community-5.1.1
-    ports:
-      - "8091-8094:8091-8094"
-      - "11210:11210"
-  couchbase_prep:
-    image: couchbase:community-5.1.1
-    links:
-      - "couchbase"
-    entrypoint: ""
-    command: >
-      bash -c "
-        echo 'waiting until couchbase is up'
-        until `curl --output /dev/null --silent --head --fail 
http://couchbase:8091`; do
-            printf '.'
-            sleep 2
-        done
-        couchbase-cli cluster-init -c couchbase \
-          --cluster-username Administrator --cluster-password password \
-          --cluster-ramsize 300 \
-          --cluster-index-ramsize 256 \
-          --services data,index,query,fts
-        couchbase-cli bucket-create -c couchbase \
-          -u Administrator -p password \
-          --bucket pekko \
-          --bucket-type couchbase \
-          --bucket-ramsize 100 \
-          --bucket-replica 1 \
-          --wait
-        couchbase-cli bucket-create -c couchbase \
-          -u Administrator -p password \
-          --bucket pekkoquery \
-          --bucket-type couchbase \
-          --bucket-ramsize 100 \
-          --bucket-replica 1 \
-          --wait
-        sleep 2 # just wait a tiny bit more after creating the bucket
-        echo 'CREATE PRIMARY INDEX ON pekkoquery USING GSI;' | \
-          cbq -c Administrator:password -e http://couchbase:8093
-      "
   elasticmq:
     image: softwaremill/elasticmq-native:1.5.8
     ports:
@@ -251,6 +211,53 @@ services:
       - 9090:9090
       - 12345:12345
     command: standalone
+  couchbase:
+    image: couchbase:community-7.6.1
+    ports:
+      - "8091-8096:8091-8096"
+      - "11210-11211:11210-11211"
+  couchbase_prep:
+    image: couchbase:community-7.6.1
+    links:
+      - "couchbase"
+    entrypoint: ""
+    # when we drop couchbase2, we don't need to create bucket pekkoquery
+    command: >
+      bash -c 
+      "echo 'waiting until couchbase is up'
+      
+      until `curl --output /dev/null --silent --head --fail 
http://couchbase:8091`; do
+            printf '.'
+            sleep 2
+      done
+      
+      couchbase-cli cluster-init -c couchbase
+      --cluster-username Administrator
+      --cluster-password password
+      --cluster-ramsize 300
+      --cluster-index-ramsize 256
+      --cluster-fts-ramsize 256
+      --cluster-query-ramsize 256
+      --services data,index,query,fts
+      
+      couchbase-cli bucket-create -c couchbase
+      -u Administrator -p password
+      --bucket pekko
+      --bucket-type couchbase
+      --bucket-ramsize 120
+      --bucket-replica 1
+      --wait
+      
+      couchbase-cli bucket-create -c couchbase
+      -u Administrator -p password
+      --bucket pekkoquery
+      --bucket-type couchbase
+      --bucket-ramsize 120
+      --bucket-replica 1
+      --wait
+      
+      sleep 2 # just wait a tiny bit more after creating the bucket
+      "
 
 volumes:
   kudu-tserver-data:
diff --git a/docs/src/main/paradox/couchbase.md 
b/docs/src/main/paradox/couchbase.md
index fd505d0e1..2fd83bd3d 100644
--- a/docs/src/main/paradox/couchbase.md
+++ b/docs/src/main/paradox/couchbase.md
@@ -41,7 +41,7 @@ Apache Pekko Connectors Couchbase offers both @ref:[Apache 
Pekko Streams APIs](#
 
 * @apidoc[CouchbaseSession] offers a direct API for one-off operations
 * @apidoc[CouchbaseSessionRegistry$] is an Apache Pekko extension to keep 
track and share `CouchbaseSession`s within an `ActorSystem`
-* @apidoc[CouchbaseSource$], @apidoc[CouchbaseFlow$], and 
@apidoc[CouchbaseSink$] offer factory methods to create Apache Pekko Stream 
operators
+* @apidoc[couchbase.*.CouchbaseSource$], @apidoc[couchbase.*.CouchbaseFlow$], 
and @apidoc[couchbase.*.CouchbaseSink$] offer factory methods to create Apache 
Pekko Stream operators
 
 ## Configuration
 
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 654ad689f..22b335f15 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -60,6 +60,7 @@ object Dependencies {
   val scalaTestMockitoVersion = "3.2.18.0" // 
https://github.com/scalatest/scalatest/issues/2311
 
   val CouchbaseVersion = "2.7.23"
+  val Couchbase3Version = "3.6.0"
   val CouchbaseVersionForDocs = "2.7"
 
   val GoogleAuthVersion = "1.23.0"
@@ -158,6 +159,14 @@ object Dependencies {
       "com.fasterxml.jackson.core" % "jackson-databind" % JacksonVersion % 
Test,
       "com.fasterxml.jackson.module" %% "jackson-module-scala" % 
JacksonVersion % Test))
 
+  val Couchbase3 = Seq(
+    libraryDependencies ++= Seq(
+      "com.couchbase.client" % "java-client" % Couchbase3Version,
+      "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided,
+      "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion % Test,
+      "com.fasterxml.jackson.core" % "jackson-databind" % JacksonVersion % 
Test,
+      "com.fasterxml.jackson.module" %% "jackson-module-scala" % 
JacksonVersion % Test))
+
   val `Doc-examples` = Seq(
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to