This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 2ce1866d0 Support Couchbase3 (#624)
2ce1866d0 is described below
commit 2ce1866d0897485bacd2b5258e0767cb9925dfba
Author: Laglangyue <[email protected]>
AuthorDate: Tue May 7 07:50:02 2024 +0800
Support Couchbase3 (#624)
* support couchbase3
* use implicit for asyncCollection, add spec
* use implicit for asyncCollection, add spec
* test couchbase3 in check-build-test.yml
* add more test,remove not need conf
* add java doc
* add java dsl
* revert not need change
* add test for increment and decrement
* fix fmt
* fix failed with scala 2.12
* fix compile failed with scala 3.3
* fix ci test
* ignore mima check for couchbase3
* fix couchbase2 docs
* Update application.conf
* Update CouchbaseSource.scala
* Update CouchbaseSource.scala
* update test and some comment
* update analyticsQuery
* update java/scala doc
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.github/workflows/check-build-test.yml | 1 +
build.sbt | 6 +-
.../stream/connectors/couchbase3/Document.scala | 34 +++
.../couchbase3/javadsl/CouchbaseFlow.scala | 253 +++++++++++++++++
.../couchbase3/javadsl/CouchbaseSink.scala | 93 ++++++
.../couchbase3/javadsl/CouchbaseSource.scala | 194 +++++++++++++
.../couchbase3/scaladsl/CouchbaseFlow.scala | 314 +++++++++++++++++++++
.../couchbase3/scaladsl/CouchbaseSink.scala | 91 ++++++
.../couchbase3/scaladsl/CouchbaseSource.scala | 226 +++++++++++++++
couchbase3/src/test/resources/application.conf | 7 +
couchbase3/src/test/resources/logback-test.xml | 31 ++
.../docs/scaladsl/CouchbaseTestFlowSpec.scala | 165 +++++++++++
.../docs/scaladsl/CouchbaseTestSourceSpec.scala | 102 +++++++
.../couchbase3/CouchbaseTestSupport.scala | 102 +++++++
docker-compose.yml | 87 +++---
docs/src/main/paradox/couchbase.md | 2 +-
project/Dependencies.scala | 9 +
17 files changed, 1675 insertions(+), 42 deletions(-)
diff --git a/.github/workflows/check-build-test.yml
b/.github/workflows/check-build-test.yml
index d22b46de4..96d9b7bb5 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -83,6 +83,7 @@ jobs:
- { connector: azure-storage-queue }
- { connector: cassandra, pre_cmd:
'docker-compose up -d cassandra' }
- { connector: couchbase, pre_cmd:
'docker-compose up -d couchbase_prep' }
+ - { connector: couchbase3, pre_cmd:
'docker-compose up -d couchbase_prep' }
- { connector: csv }
- { connector: dynamodb, pre_cmd:
'docker-compose up -d dynamodb' }
- { connector: elasticsearch, pre_cmd:
'docker-compose up -d elasticsearch6 elasticsearch7 opensearch1' }
diff --git a/build.sbt b/build.sbt
index da9e78b72..476eb9c1c 100644
--- a/build.sbt
+++ b/build.sbt
@@ -22,6 +22,7 @@ lazy val userProjects: Seq[ProjectReference] =
List[ProjectReference](
azureStorageQueue,
cassandra,
couchbase,
+ couchbase3,
csv,
dynamodb,
elasticsearch,
@@ -140,6 +141,9 @@ lazy val cassandra =
lazy val couchbase =
pekkoConnectorProject("couchbase", "couchbase", Dependencies.Couchbase)
+lazy val couchbase3 =
+ pekkoConnectorProject("couchbase3", "couchbase3", Dependencies.Couchbase3)
+
lazy val csv = pekkoConnectorProject("csv", "csv")
lazy val csvBench = internalProject("csv-bench")
@@ -460,7 +464,7 @@ def pekkoConnectorProject(projectId: String,
licenses := List(License.Apache2),
AutomaticModuleName.settings(s"pekko.stream.connectors.$moduleName"),
mimaPreviousArtifacts := {
- if (moduleName == "slick") {
+ if (moduleName == "slick" || moduleName == "couchbase3") {
Set.empty
} else {
Set(organization.value %% name.value % mimaCompareVersion)
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/Document.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/Document.scala
new file mode 100644
index 000000000..005be1030
--- /dev/null
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/Document.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3
+
+import com.couchbase.client.java.kv.MutationResult
+
+case class MutationDocument[T](id: String, doc: T, result:
Option[MutationResult] = Option.empty) {
+
+ def withResult(mutationResult: MutationResult) = {
+ copy(result = Some(mutationResult))
+ }
+}
+
+case class MutationBinaryDocument(id: String, doc: Array[Byte], result:
Option[MutationResult] = Option.empty) {
+
+ def withResult(mutationResult: MutationResult) = {
+ copy(result = Some(mutationResult))
+ }
+}
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
new file mode 100644
index 000000000..49fb20014
--- /dev/null
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.javadsl
+
+import com.couchbase.client.java.AsyncCollection
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.json.JsonObject
+import com.couchbase.client.java.kv._
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.connectors.couchbase3.{ MutationBinaryDocument,
MutationDocument }
+import org.apache.pekko.stream.javadsl.Flow
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseFlow
=> ScalaCouchbaseFlow }
+
+import java.time.{ Duration, Instant }
+
+object CouchbaseFlow {
+
+ /**
+ * get a document by id from Couchbase collection
+ * @param options reference to Couchbase options doc
+ */
+ def get(options: GetOptions = GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, GetResult,
NotUsed] =
+ ScalaCouchbaseFlow.get(options).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.get]] deserialize to Couchbase JsonObject
+ */
+ def getJson(options: GetOptions = GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, JsonObject,
NotUsed] =
+ ScalaCouchbaseFlow.getJson(options).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.get]],deserialize to class
+ * If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
+ */
+ def getObject[T](target: Class[T], options: GetOptions =
GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getObject[T](target, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.getObject]],deserialize to class with
Generics
+ */
+ def getType[T](target: TypeRef[T], options: GetOptions =
GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getType[T](target, options).asJava
+
+ /**
+ * similar to [[CouchbaseFlow.get]], but reads from all replicas on the
active node
+ * @see [[CouchbaseFlow#get]]
+ */
+ def getAllReplicas(options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String,
GetReplicaResult, NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicas(options).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to Couchbase
JsonObject
+ */
+ def getAllReplicasJson(options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, JsonObject,
NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicasJson(options).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to class
+ * If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
+ */
+ def getAllReplicasObject[T](target: Class[T],
+ getOptions: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicasObject[T](target, getOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicasObject]], deserialize to class
with Generics
+ */
+ def getAllReplicasType[T](target: TypeRef[T],
+ getOptions: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicasType(target, getOptions).asJava
+
+ /**
+ * Inserts a full document which does not exist yet with custom options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#insert]]
+ */
+ def insert[T](applyId: T => String,
+ insertOptions: InsertOptions = InsertOptions.insertOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.insert[T](applyId, insertOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.insert]] <br>
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def insertDoc[T](insertOptions: InsertOptions =
InsertOptions.insertOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.insertDoc[T](insertOptions).asJava
+
+ /**
+ * Replaces a full document which already exists with custom options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#replace]]
+ */
+ def replace[T](applyId: T => String,
+ replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.replace[T](applyId, replaceOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.replace]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def replaceDoc[T](replaceOptions: ReplaceOptions =
ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.replaceDoc[T](replaceOptions).asJava
+
+ /**
+ * Upsert a full document which might or might not exist yet with custom
options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
+ */
+ def upsert[T](applyId: T => String,
+ upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.upsert[T](applyId, upsertOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsert]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def upsertDoc[T](
+ upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.upsertDoc[T](upsertOptions).asJava
+
+ /**
+ * Removes a Document from a collection with custom options.
+ * @param applyId parse id function, which is the document id, id streams
can use `remove[String](e => e)`
+ * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+ */
+ def remove[T](
+ applyId: T => String,
+ removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.remove[T](applyId, removeOptions).asJava
+
+ /**
+ * Performs mutations to document fragments with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#mutateIn]]
+ */
+ def mutateIn(specs: java.util.List[MutateInSpec], options: MutateInOptions =
MutateInOptions.mutateInOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[String, MutateInResult,
NotUsed] =
+ ScalaCouchbaseFlow.mutateIn(specs, options).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.mutateIn]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ * @return
+ */
+ def mutateInDoc[T](
+ specs: java.util.List[MutateInSpec],
+ options: MutateInOptions = MutateInOptions.mutateInOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.mutateInDoc[T](specs, options).asJava
+
+ /**
+ * Checks if the given document ID exists on the active partition with
custom options.
+ * @param applyId parse id function, which is the document id, id streams
can use `exists[String](e => e)`
+ * @see [[com.couchbase.client.java.AsyncCollection#exists]]
+ */
+ def exists[T](
+ applyId: T => String,
+ existsOptions: ExistsOptions = ExistsOptions.existsOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
+ ScalaCouchbaseFlow.exists[T](applyId, existsOptions).asJava
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+ */
+ def touch(expiry: Duration, options: TouchOptions =
TouchOptions.touchOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[String, MutationResult,
NotUsed] =
+ ScalaCouchbaseFlow.touch(expiry, options).asJava
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @param applyId parse id function, which is the document id
+ */
+ def touchDuration[T](
+ applyId: T => String,
+ expiry: Duration,
+ touchOptions: TouchOptions = TouchOptions.touchOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.touchDuration[T](applyId, expiry, touchOptions).asJava
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+ */
+ def touchInstant[T](
+ applyId: T => String,
+ expiry: Instant,
+ touchOptions: TouchOptions = TouchOptions.touchOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.touchInstant[T](applyId, expiry, touchOptions).asJava
+
+ /**
+ * Appends binary content to the document with custom options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#append]]
+ */
+ def append(options: AppendOptions = AppendOptions.appendOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
+ ScalaCouchbaseFlow.append(options).asJava
+
+ /**
+ * Prepends binary content to the document with custom options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#prepend]]
+ */
+ def prepend(options: PrependOptions = PrependOptions.prependOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
+ ScalaCouchbaseFlow.prepend(options).asJava
+
+ /**
+ * Increments the counter document by one or the number defined in the
options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#increment]]
+ */
+ def increment(options: IncrementOptions =
IncrementOptions.incrementOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[String, CounterResult,
NotUsed] =
+ ScalaCouchbaseFlow.increment(options).asJava
+
+ /**
+ * Decrements the counter document by one or the number defined in the
options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#decrement]]
+ */
+ def decrement(options: DecrementOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, CounterResult,
NotUsed] =
+ ScalaCouchbaseFlow.decrement(options).asJava
+
+}
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
new file mode 100644
index 000000000..30d3b695b
--- /dev/null
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.javadsl
+
+import com.couchbase.client.java.AsyncCollection
+import com.couchbase.client.java.kv._
+import org.apache.pekko.stream.connectors.couchbase3.MutationDocument
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseSink
=> ScalaCouchbaseSink }
+import org.apache.pekko.stream.javadsl.Sink
+
+import scala.concurrent.Future
+
+object CouchbaseSink {
+
+ /**
+ * reference to [[CouchbaseFlow.insertDoc]]
+ */
+ def insertDoc[T](insertOptions: InsertOptions)(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
+ ScalaCouchbaseSink.insertDoc[T](insertOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsertDoc]]
+ */
+ def insert[T](applyId: T => String,
+ insertOptions: InsertOptions)(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+ ScalaCouchbaseSink.insert[T](applyId, insertOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsertDoc]]
+ */
+ def upsertDoc[T](upsertOptions: UpsertOptions =
UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
+ ScalaCouchbaseSink.upsertDoc[T](upsertOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsert]]
+ */
+ def upsert[T](applyId: T => String,
+ upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+ ScalaCouchbaseSink.upsert[T](applyId, upsertOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.replaceDoc]]
+ */
+ def replaceDoc[T](
+ replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
+ ScalaCouchbaseSink.replaceDoc[T](replaceOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.replace]]
+ */
+ def replace[T](applyId: T => String,
+ replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] = {
+ ScalaCouchbaseSink.replace[T](applyId, replaceOptions).asJava
+ }
+
+ /**
+ * reference to [[CouchbaseFlow.remove]]
+ */
+ def remove[T](applyId: T => String,
+ removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+ ScalaCouchbaseSink.remove[T](applyId, removeOptions).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.exists]]
+ */
+ def exists[T](applyId: T => String, existsOptions: ExistsOptions =
ExistsOptions.existsOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Boolean]] =
+ ScalaCouchbaseSink.exists[T](applyId, existsOptions).asJava
+
+}
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSource.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSource.scala
new file mode 100644
index 000000000..34c98df69
--- /dev/null
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSource.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.javadsl
+
+import com.couchbase.client.java.{ AsyncCollection, AsyncScope }
+import com.couchbase.client.java.analytics.{ AnalyticsOptions, AnalyticsResult
}
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.json.JsonObject
+import com.couchbase.client.java.kv._
+import com.couchbase.client.java.manager.query.{ GetAllQueryIndexesOptions,
QueryIndex }
+import com.couchbase.client.java.query.{ QueryOptions, QueryResult }
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.javadsl.Source
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{
CouchbaseSource => ScalaCoubaseSource }
+
+object CouchbaseSource {
+
+ /**
+ * get a document by id from Couchbase collection
+ * @param id document id
+ * @param options reference to Couchbase options doc
+ */
+ def get(id: String, options: GetOptions = GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Source[GetResult, NotUsed] =
+ ScalaCoubaseSource.get(id, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.get]] deserialize to Couchbase JsonObject
+ */
+ def getJson(id: String, options: GetOptions = GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+ ScalaCoubaseSource.getJson(id, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.get]],deserialize to class
+ */
+ def getObject[T](id: String, target: Class[T], options: GetOptions =
GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ ScalaCoubaseSource.getObject[T](id, target, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.getObject]],deserialize to class with
Generics
+ */
+ def getType[T](id: String, target: TypeRef[T], options: GetOptions =
GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] = {
+ ScalaCoubaseSource.getType[T](id, target, options).asJava
+ }
+
+ /**
+ * similar to [[CouchbaseSource.get]] .Reads from replicas or the active
node based on the options and returns the results as a list
+ * @param options reference to Couchbase options doc
+ * @see [[CouchbaseSource#get]]
+ */
+ def getAllReplicas(id: String, options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Source[GetReplicaResult,
NotUsed] =
+ ScalaCoubaseSource.getAllReplicas(id, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.getAllReplicas]], deserialize to Couchbase
JsonObject
+ */
+ def getAllReplicasJson(id: String, options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+ ScalaCoubaseSource.getAllReplicasJson(id, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.getAllReplicas]], deserialize to class
+ * If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
+ */
+ def getAllReplicasObject[T](id: String, target: Class[T],
+ options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ ScalaCoubaseSource.getAllReplicasObject[T](id, target, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.getAllReplicasObject]], deserialize to
class with Generics
+ */
+ def getAllReplicasType[T](id: String, target: TypeRef[T],
+ options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ ScalaCoubaseSource.getAllReplicasType[T](id, target, options).asJava
+
+ /**
+ * similar to Get[[CouchbaseSource.get]], batch get documents from
collection by ScanType[[ScanType]]
+ */
+ def scan(scanType: ScanType, options: ScanOptions =
ScanOptions.scanOptions())(
+ implicit asyncCollection: AsyncCollection): Source[ScanResult, NotUsed] =
+ ScalaCoubaseSource.scan(scanType, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.scan]], deserialize to Couchbase JsonObject
+ */
+ def scanJson(scanType: ScanType, options: ScanOptions =
ScanOptions.scanOptions())(
+ implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+ ScalaCoubaseSource.scanJson(scanType, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.scan]], deserialize to class
+ * If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
+ */
+ def scanObject[T](scanType: ScanType, target: Class[T],
+ options: ScanOptions = ScanOptions.scanOptions())(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ ScalaCoubaseSource.scanObject[T](scanType, target, options).asJava
+
+ /**
+ * reference to [[CouchbaseSource.scan]], [[CouchbaseSource.scanObject]],
deserialize to class with Generics
+ */
+ def scanType[T](scanType: ScanType, target: TypeRef[T],
+ options: ScanOptions = ScanOptions.scanOptions())(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ ScalaCoubaseSource.scanType[T](scanType, target, options).asJava
+
+ /**
+ * N1QL query in a Scope.
+ *
+ * QueryResult contains List<Row>, every Row is a json like
+ * <p>
+ * <pre>
+ * {
+ * "collectionName": Document
+ * }
+ * </pre>
+ * </p>
+ */
+ def query(statement: String, options: QueryOptions =
QueryOptions.queryOptions())(
+ implicit scope: AsyncScope): Source[QueryResult, NotUsed] =
+ ScalaCoubaseSource.query(statement, options).asJava
+
+ /**
+ * N1QL query in a Scope.
+ * @param statement N1QL query sql
+ * @see [[CouchbaseSource.query]]
+ */
+ def queryJson(statement: String, options: QueryOptions =
QueryOptions.queryOptions())(
+ implicit scope: AsyncScope): Source[JsonObject, NotUsed] =
+ ScalaCoubaseSource.queryJson(statement, options).asJava
+
+ /**
+ * Performs an Analytics query , QueryResult contains List<Row>
+ * <p>
+ * val rows: List[JsonObject] = QueryResult.rowsAsObject, every Row is a
json with CollectionName key
+ * <pre>
+ * {
+ * "collectionName": Document
+ * }
+ * </pre>
+ * </p>
+ * <p>
+ * Warning: couchbase-community not support analyticsQuery, we not test
this API
+ * https://www.couchbase.com/products/editions/server/
+ * </p>
+ *
+ * @param statement Analytics query sql
+ */
+ def analyticsQuery(statement: String, options: AnalyticsOptions =
AnalyticsOptions.analyticsOptions())(
+ implicit scope: AsyncScope): Source[AnalyticsResult, NotUsed] =
+ ScalaCoubaseSource.analyticsQuery(statement, options).asJava
+
+ /**
+ * <p>
+ * Warning: couchbase-community not support analyticsQuery, we not test
this API
+ * </p>
+ *
+ * Performs an Analytics query and convert document row to jsonObject <br>
+ * different with analyticsQuery, jsonObject not has the collection Key, but
is Document directly<br>
+ * @see [[CouchbaseSource.analyticsQuery]]
+ */
+ def analyticsQueryJson(statement: String, options: AnalyticsOptions =
AnalyticsOptions.analyticsOptions())(
+ implicit scope: AsyncScope): Source[JsonObject, NotUsed] =
+ ScalaCoubaseSource.analyticsQueryJson(statement, options).asJava
+
+ /**
+ * Fetches all indexes from this collection with custom options.
+ * @see
[[com.couchbase.client.java.manager.query.AsyncCollectionQueryIndexManager#getAllIndexes]]
+ */
+ def queryAllIndex(options: GetAllQueryIndexesOptions =
GetAllQueryIndexesOptions.getAllQueryIndexesOptions)(
+ implicit asyncCollection: AsyncCollection): Source[QueryIndex, NotUsed] =
+ ScalaCoubaseSource.queryAllIndex(options).asJava
+}
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseFlow.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseFlow.scala
new file mode 100644
index 000000000..3fb798cc8
--- /dev/null
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseFlow.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.scaladsl
+
+import com.couchbase.client.java.AsyncCollection
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.json.JsonObject
+import com.couchbase.client.java.kv._
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.connectors.couchbase3.{ MutationBinaryDocument,
MutationDocument }
+import org.apache.pekko.stream.scaladsl.{ Flow, Source }
+
+import java.time.{ Duration, Instant }
+
+object CouchbaseFlow {
+
+ /**
+ * get a document by id from Couchbase collection
+ * @param options reference to Couchbase options doc
+ */
+ def get(options: GetOptions = GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, GetResult,
NotUsed] =
+ Flow[String].flatMapConcat(CouchbaseSource.get(_, options))
+
+ /**
+ * reference to [[CouchbaseFlow.get]] deserialize to Couchbase JsonObject
+ */
+ def getJson(options: GetOptions = GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, JsonObject,
NotUsed] =
+ get(options).map(_.contentAsObject())
+
+ /**
+ * reference to [[CouchbaseFlow.get]],deserialize to class
+ * If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
+ */
+ def getObject[T](target: Class[T], options: GetOptions =
GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ Flow[String]
+ .flatMapConcat(CouchbaseSource.getObject(_, target, options))
+
+ /**
+ * reference to [[CouchbaseSource.getObject]],deserialize to class with
Generics
+ */
+ def getType[T](target: TypeRef[T], options: GetOptions =
GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ Flow[String]
+ .flatMapConcat(CouchbaseSource.getType(_, target, options))
+
+ /**
+ * same to Get option, but reads from all replicas on the active node
+ * @see [[CouchbaseFlow#get]]
+ */
+ def getAllReplicas(options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String,
GetReplicaResult, NotUsed] =
+ Flow[String]
+ .flatMapConcat(CouchbaseSource.getAllReplicas(_, options))
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to Couchbase
JsonObject
+ */
+ def getAllReplicasJson(options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, JsonObject,
NotUsed] =
+ getAllReplicas(options)
+ .map(_.contentAsObject())
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to class
+ */
+ def getAllReplicasObject[T](target: Class[T],
+ getOptions: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ Flow[String]
+ .flatMapConcat(CouchbaseSource.getAllReplicasObject(_, target,
getOptions))
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicasObject]], deserialize to class
with Generics
+ */
+ def getAllReplicasType[T](target: TypeRef[T],
+ getOptions: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ Flow[String]
+ .flatMapConcat(CouchbaseSource.getAllReplicasType(_, target, getOptions))
+
+ /**
+ * Inserts a full document which does not exist yet with custom options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#insert]]
+ */
+ def insert[T](applyId: T => String,
+ insertOptions: InsertOptions = InsertOptions.insertOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] = {
+ Flow[T]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.insert(applyId(doc), doc,
insertOptions))
+ .map(_ => doc)
+ }
+ }
+
+ /**
+ * reference to [[CouchbaseFlow.insert]] <br>
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def insertDoc[T](insertOptions: InsertOptions =
InsertOptions.insertOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ Flow[MutationDocument[T]]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.insert(doc.id, doc.doc,
insertOptions))
+ .map(doc.withResult)
+ }
+
+ /**
+ * Replaces a full document which already exists with custom options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#replace]]
+ */
+ def replace[T](applyId: T => String,
+ replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ Flow[T]
+ .flatMapConcat { content =>
+ Source.completionStage(asyncCollection.replace(applyId(content),
content, replaceOptions))
+ .map(_ => content)
+ }
+
+ /**
+ * reference to [[CouchbaseFlow.replace]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def replaceDoc[T](replaceOptions: ReplaceOptions =
ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ Flow[MutationDocument[T]]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.replace(doc.id, doc.doc,
replaceOptions))
+ .map(doc.withResult)
+ }
+
+ /**
+ * Upsert a full document which might or might not exist yet with custom
options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
+ */
+ def upsert[T](applyId: T => String,
+ upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ Flow[T]
+ .flatMapConcat { content =>
+ Source.completionStage(asyncCollection.upsert(applyId(content),
content, upsertOptions))
+ .map(_ => content)
+ }
+
+ /**
+ * reference to [[CouchbaseFlow.upsert]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def upsertDoc[T](
+ upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ Flow[MutationDocument[T]]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.upsert(doc.id, doc.doc,
upsertOptions))
+ .map(doc.withResult)
+ }
+
+ /**
+ * Removes a Document from a collection with custom options.
+ * @param applyId parse id function, which is the document id, id streams
can use `remove[String](e => e)`
+ * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+ */
+ def remove[T](
+ applyId: T => String,
+ removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ Flow[T]
+ .flatMapConcat { doc =>
+ Source
+ .completionStage(asyncCollection.remove(applyId(doc), removeOptions))
+ .map(_ => doc)
+ }
+
+ /**
+ * Performs mutations to document fragments with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#mutateIn]]
+ */
+ def mutateIn(specs: java.util.List[MutateInSpec], options: MutateInOptions =
MutateInOptions.mutateInOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[String, MutateInResult,
NotUsed] =
+ Flow[String]
+ .flatMapConcat { id =>
+ Source.completionStage(asyncCollection.mutateIn(id, specs, options))
+ }
+
+ /**
+ * reference to [[CouchbaseFlow.mutateIn]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ * @return
+ */
+ def mutateInDoc[T](specs: java.util.List[MutateInSpec], options:
MutateInOptions = MutateInOptions.mutateInOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ Flow[MutationDocument[T]]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.mutateIn(doc.id, specs,
options))
+ .map(doc.withResult)
+ }
+
+ /**
+ * Checks if the given document ID exists on the active partition with
custom options.
+ * @param applyId parse id function, which is the document id, id streams
can use `exists[String](e => e)`
+ * @see [[com.couchbase.client.java.AsyncCollection#exists]]
+ */
+ def exists[T](applyId: T => String, existsOptions: ExistsOptions =
ExistsOptions.existsOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
+ Flow[T]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.exists(applyId(doc),
existsOptions))
+ .map(_.exists())
+ }
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+ */
+ def touch(expiry: Duration, options: TouchOptions =
TouchOptions.touchOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[String, MutationResult,
NotUsed] =
+ Flow[String]
+ .flatMapConcat { id =>
+ Source.completionStage(asyncCollection.touch(id, expiry, options))
+ }
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @param applyId parse id function, which is the document id
+ */
+ def touchDuration[T](applyId: T => String, expiry: Duration,
+ touchOptions: TouchOptions = TouchOptions.touchOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ Flow[T]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.touch(applyId(doc), expiry,
touchOptions))
+ .map(_ => doc)
+ }
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+ */
+ def touchInstant[T](applyId: T => String, expiry: Instant,
+ touchOptions: TouchOptions = TouchOptions.touchOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ Flow[T]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.touch(applyId(doc), expiry,
touchOptions))
+ .map(_ => doc)
+ }
+
+ /**
+ * Appends binary content to the document with custom options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#append]]
+ */
+ def append(options: AppendOptions = AppendOptions.appendOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
+ Flow[MutationBinaryDocument]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.binary().append(doc.id,
doc.doc, options))
+ }
+
+ /**
+ * Prepends binary content to the document with custom options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#prepend]]
+ */
+ def prepend(options: PrependOptions = PrependOptions.prependOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
+ Flow[MutationBinaryDocument]
+ .flatMapConcat { doc =>
+ Source.completionStage(asyncCollection.binary().prepend(doc.id,
doc.doc, options))
+
+ }
+
+ /**
+ * Increments the counter document by one or the number defined in the
options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#increment]]
+ */
+ def increment(options: IncrementOptions =
IncrementOptions.incrementOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[String, CounterResult,
NotUsed] =
+ Flow[String]
+ .flatMapConcat { id =>
+ Source.completionStage(asyncCollection.binary().increment(id, options))
+ }
+
+ /**
+ * Decrements the counter document by one or the number defined in the
options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#decrement]]
+ */
+ def decrement(options: DecrementOptions =
DecrementOptions.decrementOptions())(
+ implicit asyncCollection: AsyncCollection): Flow[String, CounterResult,
NotUsed] =
+ Flow[String]
+ .flatMapConcat { id =>
+ Source.completionStage(asyncCollection.binary().decrement(id, options))
+ }
+
+}
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSink.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSink.scala
new file mode 100644
index 000000000..a907cd553
--- /dev/null
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSink.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.scaladsl
+
+import com.couchbase.client.java.AsyncCollection
+import com.couchbase.client.java.kv.{ ExistsOptions, InsertOptions,
RemoveOptions, ReplaceOptions, UpsertOptions }
+import org.apache.pekko.stream.connectors.couchbase3.MutationDocument
+import org.apache.pekko.stream.scaladsl.{ Keep, Sink }
+import org.apache.pekko.Done
+
+import scala.concurrent.Future
+
+object CouchbaseSink {
+
+ /**
+ * reference to [[CouchbaseFlow.insertDoc]]
+ */
+ def insertDoc[T](insertOptions: InsertOptions =
InsertOptions.insertOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
+ CouchbaseFlow.insertDoc[T](insertOptions).toMat(Sink.ignore)(Keep.right)
+
+ /**
+ * reference to [[CouchbaseFlow.upsertDoc]]
+ */
+ def insert[T](applyId: T => String,
+ insertOptions: InsertOptions = InsertOptions.insertOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+ CouchbaseFlow.insert[T](applyId,
insertOptions).toMat(Sink.ignore)(Keep.right)
+
+ /**
+ * reference to [[CouchbaseFlow.upsertDoc]]
+ */
+ def upsertDoc[T](upsertOptions: UpsertOptions =
UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
+ CouchbaseFlow.upsertDoc[T](upsertOptions).toMat(Sink.ignore)(Keep.right)
+
+ /**
+ * reference to [[CouchbaseFlow.upsert]]
+ */
+ def upsert[T](applyId: T => String,
+ upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+ CouchbaseFlow.upsert[T](applyId,
upsertOptions).toMat(Sink.ignore)(Keep.right)
+
+ /**
+ * reference to [[CouchbaseFlow.replaceDoc]]
+ */
+ def replaceDoc[T](
+ replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
Future[Done]] =
+ CouchbaseFlow.replaceDoc[T](replaceOptions).toMat(Sink.ignore)(Keep.right)
+
+ /**
+ * reference to [[CouchbaseFlow.replace]]
+ */
+ def replace[T](applyId: T => String,
+ replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+ CouchbaseFlow.replace[T](applyId,
replaceOptions).toMat(Sink.ignore)(Keep.right)
+
+ /**
+ * reference to [[CouchbaseFlow.remove]]
+ */
+ def remove[T](applyId: T => String,
+ removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Done]] =
+ CouchbaseFlow.remove(applyId, removeOptions).toMat(Sink.ignore)(Keep.right)
+
+ /**
+ * reference to [[CouchbaseFlow.exists]]
+ */
+ def exists[T](applyId: T => String, existsOptions: ExistsOptions =
ExistsOptions.existsOptions())(
+ implicit asyncCollection: AsyncCollection): Sink[T, Future[Boolean]] =
+ CouchbaseFlow.exists(applyId, existsOptions).toMat(Sink.head)(Keep.right)
+
+}
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSource.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSource.scala
new file mode 100644
index 000000000..17432e7a6
--- /dev/null
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/scaladsl/CouchbaseSource.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3.scaladsl
+
+import com.couchbase.client.java.{ AsyncCollection, AsyncScope }
+import com.couchbase.client.java.analytics.{ AnalyticsOptions, AnalyticsResult
}
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.json.JsonObject
+import com.couchbase.client.java.kv._
+import com.couchbase.client.java.manager.query.{ GetAllQueryIndexesOptions,
QueryIndex }
+import com.couchbase.client.java.query.{ QueryOptions, QueryResult }
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.scaladsl.Source
+
+object CouchbaseSource {
+
+ /**
+ * get a document by id from Couchbase collection
+ * @param id document id
+ * @param options reference to Couchbase options doc
+ */
+ def get(id: String, options: GetOptions = GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Source[GetResult, NotUsed] =
+ Source.completionStage(asyncCollection.get(id, options))
+
+ /**
+ * reference to [[CouchbaseSource.get]] deserialize to Couchbase JsonObject
+ */
+ def getJson(id: String, options: GetOptions = GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+ get(id, options).map(_.contentAsObject())
+
+ /**
+ * reference to [[CouchbaseSource.get]],deserialize to class
+ */
+ def getObject[T](id: String, target: Class[T], options: GetOptions =
GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ get(id, options).map(_.contentAs(target))
+
+ /**
+ * reference to [[CouchbaseSource.getObject]],deserialize to class with
Generics
+ */
+ def getType[T](id: String, target: TypeRef[T], options: GetOptions =
GetOptions.getOptions)(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] = {
+ get(id, options).map(_.contentAs(target))
+ }
+
+ /**
+ * similar to get(id, options) .Reads from replicas or the active node based
on the options and returns the results as a list
+ * @param options reference to Couchbase options doc
+ * @see [[CouchbaseSource#get]]
+ */
+ def getAllReplicas(id: String, options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Source[GetReplicaResult,
NotUsed] =
+ Source
+ .completionStage(asyncCollection.getAllReplicas(id, options))
+ .flatMapConcat { res =>
+ Source.fromJavaStream(() => res.stream())
+ .flatMapConcat(Source.completionStage)
+ }
+
+ /**
+ * reference to [[CouchbaseSource.getAllReplicas]], deserialize to Couchbase
JsonObject
+ */
+ def getAllReplicasJson(id: String, options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+ getAllReplicas(id, options)
+ .map(_.contentAsObject())
+
+ /**
+ * reference to [[CouchbaseSource.getAllReplicas]], deserialize to class
+ */
+ def getAllReplicasObject[T](id: String, target: Class[T],
+ options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ getAllReplicas(id, options)
+ .map(_.contentAs(target))
+
+ /**
+ * reference to [[CouchbaseSource.getAllReplicasObject]], deserialize to
class with Generics
+ */
+ def getAllReplicasType[T](id: String, target: TypeRef[T],
+ options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ getAllReplicas(id, options)
+ .map(_.contentAs(target))
+
+ /**
+ * similar to Get[[CouchbaseSource.get]], batch get documents from
collection by ScanType[[ScanType]]
+ */
+ def scan(scanType: ScanType, options: ScanOptions =
ScanOptions.scanOptions())(
+ implicit asyncCollection: AsyncCollection): Source[ScanResult, NotUsed] =
+ Source
+ .completionStage(asyncCollection.scan(scanType, options))
+ .flatMapConcat(res => Source.fromJavaStream(() => res.stream()))
+
+ /**
+ * reference to [[CouchbaseSource.scan]], deserialize to Couchbase JsonObject
+ */
+ def scanJson(scanType: ScanType, options: ScanOptions =
ScanOptions.scanOptions())(
+ implicit asyncCollection: AsyncCollection): Source[JsonObject, NotUsed] =
+ scan(scanType, options)
+ .map(_.contentAsObject())
+
+ /**
+ * reference to [[CouchbaseSource.scan]], deserialize to class
+ */
+ def scanObject[T](scanType: ScanType, target: Class[T],
+ options: ScanOptions = ScanOptions.scanOptions())(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ scan(scanType, options)
+ .map(_.contentAs(target))
+
+ /**
+ * reference to [[CouchbaseSource.scan]], [[CouchbaseSource.scanObject]],
deserialize to class with Generics
+ */
+ def scanType[T](scanType: ScanType, target: TypeRef[T],
+ options: ScanOptions = ScanOptions.scanOptions())(
+ implicit asyncCollection: AsyncCollection): Source[T, NotUsed] =
+ scan(scanType, options)
+ .map(_.contentAs(target))
+
+ /**
+ * N1QL query in a Scope.
+ *
+ * QueryResult contains List<Row>, every Row is a json like
+ * <p>
+ * <pre>
+ * {
+ * "collectionName": Document
+ * }
+ * </pre>
+ * </p>
+ */
+ def query(statement: String, options: QueryOptions =
QueryOptions.queryOptions())(
+ implicit scope: AsyncScope): Source[QueryResult, NotUsed] = {
+ Source
+ .completionStage(scope.query(statement, options))
+ }
+
+ /**
+ * N1QL query in a Scope.
+ * @param statement N1QL query sql
+ * @see [[CouchbaseSource.query]]
+ */
+ def queryJson(
+ statement: String,
+ options: QueryOptions = QueryOptions.queryOptions())(
+ implicit scope: AsyncScope): Source[JsonObject, NotUsed] = {
+ query(statement, options)
+ .flatMapConcat { res =>
+ Source.fromJavaStream(() =>
+ res.rowsAsObject().stream.flatMap[JsonObject] { json =>
+ json.getNames.stream().map(collection =>
json.getObject(collection))
+ })
+ }
+ }
+
+ /**
+ * Performs an Analytics query , QueryResult contains List<Row>
+ * <p>
+ * val rows: List[JsonObject] = QueryResult.rowsAsObject, every Row is a
json with CollectionName key
+ * <pre>
+ * {
+ * "collectionName": Document
+ * }
+ * </pre>
+ * </p>
+ * <p>
+ * Warning: couchbase-community not support analyticsQuery, we not test
this API
+ * https://www.couchbase.com/products/editions/server/
+ * </p>
+ *
+ * @param statement Analytics query sql
+ */
+ def analyticsQuery(statement: String,
+ options: AnalyticsOptions = AnalyticsOptions.analyticsOptions())(
+ implicit scope: AsyncScope): Source[AnalyticsResult, NotUsed] =
+ Source
+ .completionStage(scope.analyticsQuery(statement, options))
+
+ /**
+ * <p>
+ * Warning: couchbase-community not support analyticsQuery, we not test
this API
+ * </p>
+ *
+ * Performs an Analytics query and convert document row to jsonObject <br>
+ * different with analyticsQuery, jsonObject not has the collection Key, but
is Document directly<br>
+ * @see [[CouchbaseSource.analyticsQuery]]
+ */
+ def analyticsQueryJson(statement: String,
+ options: AnalyticsOptions = AnalyticsOptions.analyticsOptions())(
+ implicit scope: AsyncScope): Source[JsonObject, NotUsed] =
+ analyticsQuery(statement, options)
+ .flatMapConcat { res =>
+ Source.fromJavaStream(() =>
+ res.rowsAsObject().stream().flatMap[JsonObject] { json =>
+ json.getNames.stream().map(collection =>
json.getObject(collection))
+ })
+ }
+
+ /**
+ * Fetches all indexes from this collection with custom options.
+ * @see
[[com.couchbase.client.java.manager.query.AsyncCollectionQueryIndexManager#getAllIndexes]]
+ */
+ def queryAllIndex(options: GetAllQueryIndexesOptions =
GetAllQueryIndexesOptions.getAllQueryIndexesOptions)(
+ implicit asyncCollection: AsyncCollection): Source[QueryIndex, NotUsed] =
+ Source
+ .completionStage(asyncCollection.queryIndexes().getAllIndexes(options))
+ .flatMapConcat(list => Source.fromJavaStream(() => list.stream()))
+}
diff --git a/couchbase3/src/test/resources/application.conf
b/couchbase3/src/test/resources/application.conf
new file mode 100644
index 000000000..a1d8c93dd
--- /dev/null
+++ b/couchbase3/src/test/resources/application.conf
@@ -0,0 +1,7 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
+ loglevel = "DEBUG"
+}
diff --git a/couchbase3/src/test/resources/logback-test.xml
b/couchbase3/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..8b7b58047
--- /dev/null
+++ b/couchbase3/src/test/resources/logback-test.xml
@@ -0,0 +1,31 @@
+<configuration>
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>target/couchbase.log</file>
+ <append>false</append>
+ <encoder>
+ <pattern>%d{ISO8601} %-5level [%thread] [%logger{36}]
%msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %-5level [%-20.20thread]
%-36.36logger{36} %msg%n%rEx</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="CapturingAppender"
class="org.apache.pekko.stream.connectors.testkit.CapturingAppender"/>
+
+ <logger
name="org.apache.pekko.stream.connectors.testkit.CapturingAppenderDelegate">
+ <appender-ref ref="STDOUT"/>
+ </logger>
+
+ <logger name="org.apache.pekko" level="DEBUG"/>
+ <logger name="org.apache.pekko.http.impl" level="WARN"/>
+ <logger name="org.apache.pekko.stream.connectors" level="DEBUG"/>
+ <logger name="com.couchbase" level="INFO"/>
+
+ <root level="debug">
+ <appender-ref ref="CapturingAppender"/>
+ <appender-ref ref="FILE" />
+ </root>
+</configuration>
diff --git
a/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestFlowSpec.scala
b/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestFlowSpec.scala
new file mode 100644
index 000000000..a50d90a59
--- /dev/null
+++ b/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestFlowSpec.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.scaladsl
+
+import com.couchbase.client.java.kv.{ DecrementOptions, IncrementOptions,
MutateInSpec, ReplaceOptions }
+import com.couchbase.client.java.AsyncCollection
+import org.apache.pekko.stream.connectors.couchbase3.{ CouchbaseTestSupport,
Document }
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseFlow,
CouchbaseSink, CouchbaseSource }
+import org.apache.pekko.stream.connectors.testkit.scaladsl.LogCapturing
+import org.apache.pekko.stream.scaladsl.{ Flow, Sink, Source }
+import
org.apache.pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
+import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Inspectors }
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+import java.time.Duration
+import java.util.Collections
+import scala.collection.immutable.Seq
+import scala.concurrent.Await
+import scala.concurrent.duration.DurationInt
+import scala.util.Random
+
+class CouchbaseTestFlowSpec extends AnyWordSpec
+ with CouchbaseTestSupport
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with Matchers
+ with ScalaFutures
+ with Inspectors
+ with LogCapturing {
+
+ private implicit val collection: AsyncCollection = simpleContext.collection
+
+ override protected def beforeAll(): Unit = {
+ mockData(simpleContext)
+ }
+
+ override protected def afterAll(): Unit = {
+ clearData(simpleContext)
+ }
+
+ "insert-get-remove document" in assertAllStagesStopped {
+ val data = Seq[Document](document.copy(id = s"$docId-1"), document.copy(id
= s"$docId-2"))
+ // insert => get => remove, because we add one,we need clear it
+ val insertFlow = CouchbaseFlow.insert[Document](_.id)
+ val getFlow = CouchbaseFlow.getObject(classOf[Document])
+ val removeFlow = CouchbaseFlow.remove[Document](_.id)
+ val insertFuture = Source(data)
+ .via(insertFlow)
+ .map(_.id)
+ .via(getFlow)
+ .via(removeFlow)
+ .runWith(Sink.seq)
+
+ insertFuture.futureValue shouldBe data
+ }
+
+ "upsert-get document" in assertAllStagesStopped {
+ val upsertDocument = document.copy(value = document.value + "-upsert")
+ val upsertFlow = CouchbaseFlow.upsert[Document](_.id)
+ val getFlow = CouchbaseFlow.getObject[Document](classOf[Document])
+ val upsertFuture = Source.single[Document](upsertDocument)
+ .via(upsertFlow)
+ .map(_.id)
+ .via(getFlow)
+ .runWith(Sink.head)
+ upsertFuture.futureValue shouldBe upsertDocument
+ }
+
+ "replace data" in assertAllStagesStopped {
+ val replaceDocument = document.copy(value = document.value + "-upsert")
+ // replace with cas, last remove
+ val replaceFlow = CouchbaseSource.get(replaceDocument.id)
+ .map(_.cas())
+ .map(ReplaceOptions.replaceOptions().cas(_))
+ .map { e =>
+ CouchbaseFlow.replace[Document](_.id, e)
+ }.runWith(Sink.head)
+
+ val future = Source.single(replaceDocument)
+ .via(Flow.futureFlow(replaceFlow))
+ .map(_.id)
+ .via(CouchbaseFlow.getObject(classOf[Document]))
+ .runWith(Sink.head)
+
+ future.futureValue shouldBe replaceDocument
+ }
+
+ "check exists" in assertAllStagesStopped {
+ val existsFlow = CouchbaseFlow.exists[Document](_.id)
+ val existFuture = Source.single(document)
+ .via(existsFlow)
+ .runWith(Sink.head)
+ existFuture.futureValue shouldBe true
+ }
+
+ "touch 1s, wait 2s and check exists" in assertAllStagesStopped {
+ val touchDocument = document.copy(id = document.id + "-touch")
+ // touch 1s, sleep 1s, check exists = false
+ val insertFuture = Source.single(touchDocument)
+ .runWith(CouchbaseSink.insert[Document](_.id))
+ Await.result(insertFuture, 10.seconds)
+ val touchFlow = CouchbaseFlow.touchDuration[Document](_.id,
Duration.ofSeconds(1))
+ val touchFuture = Source.single(touchDocument)
+ .via(touchFlow)
+ .runWith(Sink.ignore)
+ Await.result(touchFuture, 10.seconds)
+ // wait the doc was deleted by couchbase
+ Thread.sleep(2000)
+ val future =
Source.single(touchDocument.id).runWith(CouchbaseSink.exists[String](e => e))
+ future.futureValue shouldBe false
+ }
+
+ "mutateIn data: add field to JsonDoc" in assertAllStagesStopped {
+ // scala users can use convert. We support 2.12,so use
java.util.Collections
+ val insertOptions: java.util.List[MutateInSpec] =
+ Collections.singletonList(MutateInSpec.insert("mutate", "mutate"))
+ val mutateInFlow = CouchbaseFlow.mutateIn(insertOptions)
+ val mutateFuture = Source.single(jsonId)
+ .via(mutateInFlow)
+ .runWith(Sink.head)
+ Await.result(mutateFuture, 10.seconds)
+ val future = CouchbaseSource.getJson(jsonId).runWith(Sink.head)
+ future.futureValue.getString("mutate") shouldBe "mutate"
+ }
+
+ "increment and decrement on Number document" in assertAllStagesStopped {
+ val id = Random.nextString(10)
+ registerBeClear.add(id)
+ val value: Long = 0
+ val increment = 10
+ val insertNum = Source.single(value).runWith(CouchbaseSink.insert[Long](_
=> id))
+ Await.result(insertNum, 10.seconds)
+ val incrementFlow =
CouchbaseFlow.increment(IncrementOptions.incrementOptions().delta(increment))
+ val incrementFuture = Source.single(id)
+ .via(incrementFlow)
+ .runWith(Sink.head)
+
+ incrementFuture.futureValue.content() shouldBe (value + increment)
+ val decrement: Long = 5
+ val decrementFlow =
CouchbaseFlow.decrement(DecrementOptions.decrementOptions().delta(decrement))
+ val decrementFuture = Source.single(id)
+ .via(decrementFlow)
+ .runWith(Sink.head)
+
+ decrementFuture.futureValue.content() shouldBe (value + increment -
decrement)
+ }
+
+}
diff --git
a/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestSourceSpec.scala
b/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestSourceSpec.scala
new file mode 100644
index 000000000..7697a1540
--- /dev/null
+++ b/couchbase3/src/test/scala/docs/scaladsl/CouchbaseTestSourceSpec.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.scaladsl
+
+import com.couchbase.client.java.{ AsyncCollection, AsyncScope }
+import com.couchbase.client.java.codec.TypeRef
+import com.couchbase.client.java.kv.ScanType
+import org.apache.pekko.stream.connectors.couchbase3.{ CouchbaseTestSupport,
Document, TypeDocument }
+import org.apache.pekko.stream.connectors.couchbase3.scaladsl.CouchbaseSource
+import org.apache.pekko.stream.connectors.testkit.scaladsl.LogCapturing
+import org.apache.pekko.stream.scaladsl.Sink
+import
org.apache.pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
+import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Inspectors }
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class CouchbaseTestSourceSpec extends AnyWordSpec
+ with CouchbaseTestSupport
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with Matchers
+ with ScalaFutures
+ with Inspectors
+ with LogCapturing {
+
+ private implicit val collection: AsyncCollection = simpleContext.collection
+ private implicit val scope: AsyncScope = simpleContext.scope
+
+ override protected def beforeAll(): Unit = {
+ mockData(simpleContext)
+ }
+
+ override protected def afterAll(): Unit = {
+ clearData(simpleContext)
+ }
+
+ "get document by jsonObject" in assertAllStagesStopped {
+ val future = CouchbaseSource.getJson(jsonId)
+ .runWith(Sink.head)
+ future.futureValue.getString("id") shouldBe jsonId
+ }
+
+ "get document by scala case class" in assertAllStagesStopped {
+ val future = CouchbaseSource.getObject(docId, classOf[Document])
+ .runWith(Sink.head)
+ val futureValue = future.futureValue
+ futureValue shouldBe document
+ }
+
+ "get document class with type" in assertAllStagesStopped {
+ val docType = new TypeRef[TypeDocument[String]] {}
+ val future = CouchbaseSource.getType(typeId, docType)
+ .runWith(Sink.head)
+ val futureValue = future.futureValue
+ futureValue shouldBe typeDocument
+ }
+
+ "scan document" in assertAllStagesStopped {
+ val future =
CouchbaseSource.scan(ScanType.samplingScan(1)).runWith(Sink.head)
+ idSet.contains(future.futureValue.id()) shouldBe true
+ }
+
+ "query document by sql++" in assertAllStagesStopped {
+ val sql = s"select * from $defaultCollection"
+ val queryFuture = CouchbaseSource.query(sql).runWith(Sink.head)
+ queryFuture.futureValue.rowsAsObject().size() shouldBe dataSet.size
+ }
+
+ "query json document by sql++" in assertAllStagesStopped {
+ val sql = s"select * from $defaultCollection"
+ val queryFuture = CouchbaseSource.queryJson(sql).runWith(Sink.seq)
+ val queryDocuments = queryFuture.futureValue
+ queryDocuments.size shouldBe dataSet.size
+ queryDocuments.foreach { e =>
+ idSet.contains(e.getString("id")) shouldBe true
+ }
+ }
+
+ "query all indexes" in assertAllStagesStopped {
+ // we have a primary index in afterAll
+ val queryIndexFuture = CouchbaseSource.queryAllIndex().runWith(Sink.seq)
+ val future = queryIndexFuture.futureValue
+ future.foreach(_.primary() shouldBe true)
+ future.size shouldBe 1
+ }
+}
diff --git
a/couchbase3/src/test/scala/org/apache/pekko/stream/connectors/couchbase3/CouchbaseTestSupport.scala
b/couchbase3/src/test/scala/org/apache/pekko/stream/connectors/couchbase3/CouchbaseTestSupport.scala
new file mode 100644
index 000000000..c076efbd8
--- /dev/null
+++
b/couchbase3/src/test/scala/org/apache/pekko/stream/connectors/couchbase3/CouchbaseTestSupport.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.couchbase3
+
+import com.couchbase.client.core.io.CollectionIdentifier
+import com.couchbase.client.java._
+import com.couchbase.client.java.codec.JacksonJsonSerializer
+import com.couchbase.client.java.env.ClusterEnvironment
+import com.couchbase.client.java.json.JsonObject
+import com.fasterxml.jackson.databind.json.JsonMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.pekko.actor.ActorSystem
+import
org.apache.pekko.stream.connectors.couchbase3.CouchbaseTestSupport.bucketName
+
+import scala.collection.mutable
+
+object CouchbaseTestSupport {
+
+ private val connectionString = "localhost"
+ private val username = "Administrator"
+ private val password = "password"
+ private lazy val bucketName = "pekko"
+
+ private val jacksonMapper = JsonMapper.builder()
+ .addModule(DefaultScalaModule)
+ .build()
+ private val serializer: JacksonJsonSerializer =
JacksonJsonSerializer.create(jacksonMapper)
+ private val environment =
ClusterEnvironment.builder().jsonSerializer(serializer).build()
+ lazy val cluster: Cluster = Cluster.connect(connectionString,
+ ClusterOptions.clusterOptions(username, password).environment(environment))
+ lazy val asyncCluster: AsyncCluster = cluster.async()
+}
+
+class SpecContext(bucketName: String) {
+ import CouchbaseTestSupport._
+ lazy val bucket: AsyncBucket = asyncCluster.bucket(bucketName)
+ lazy val scope: AsyncScope = bucket.defaultScope()
+ lazy val collection: AsyncCollection = bucket.defaultCollection()
+ // used for mock and clear data
+ lazy val mock: Collection =
CouchbaseTestSupport.cluster.bucket(bucketName).defaultCollection()
+
+}
+
+trait CouchbaseTestSupport {
+ implicit val actorSystem: ActorSystem = ActorSystem()
+ val simpleContext = new SpecContext(bucketName)
+ val defaultScope = CollectionIdentifier.fromDefault(bucketName).scope().get()
+ val defaultCollection =
CollectionIdentifier.fromDefault(bucketName).collection().get()
+
+ val jsonId: String = "pekko-couchbase-json"
+ val docId: String = "pekko-couchbase-doc"
+ val typeId: String = "pekko-couchbase-type"
+ val binaryId: String = "pekko-couchbase-binary"
+
+ val jsonObject: JsonObject = JsonObject.create().put("id",
jsonId).put("value", jsonId)
+ val document = Document(docId, docId)
+ val typeDocument = TypeDocument[String](typeId, List(typeId))
+ val binaryDocument = BinaryDocument(binaryId, binaryId.getBytes)
+ // idSet dataSet should be associated
+ val idSet = Seq(jsonId, docId, typeId, binaryId)
+ val dataSet = Set(jsonObject, document, typeDocument, binaryDocument)
+
+ val registerBeClear = mutable.Set[String]()
+
+ def mockData(specContext: SpecContext): Unit = {
+ idSet.zip(dataSet).foreach { e =>
+ specContext.mock.insert(e._1, e._2)
+ }
+ simpleContext.mock.queryIndexes().createPrimaryIndex()
+ }
+
+ def clearData(specContext: SpecContext): Unit = {
+ idSet.foreach(specContext.mock.remove)
+ simpleContext.mock.queryIndexes().dropPrimaryIndex()
+ for (elem <- registerBeClear) {
+ try {
+ specContext.mock.remove(elem)
+ } catch {
+ case _: Throwable => // nothing
+ }
+ }
+ }
+}
+
+case class Document(id: String, value: String)
+case class TypeDocument[T](id: String, value: List[T])
+case class BinaryDocument(id: String, value: Array[Byte])
diff --git a/docker-compose.yml b/docker-compose.yml
index 3bfc6873e..cc8c05ca8 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -25,46 +25,6 @@ services:
image: cassandra:4.0
ports:
- "9042:9042"
- couchbase:
- image: couchbase:community-5.1.1
- ports:
- - "8091-8094:8091-8094"
- - "11210:11210"
- couchbase_prep:
- image: couchbase:community-5.1.1
- links:
- - "couchbase"
- entrypoint: ""
- command: >
- bash -c "
- echo 'waiting until couchbase is up'
- until `curl --output /dev/null --silent --head --fail
http://couchbase:8091`; do
- printf '.'
- sleep 2
- done
- couchbase-cli cluster-init -c couchbase \
- --cluster-username Administrator --cluster-password password \
- --cluster-ramsize 300 \
- --cluster-index-ramsize 256 \
- --services data,index,query,fts
- couchbase-cli bucket-create -c couchbase \
- -u Administrator -p password \
- --bucket pekko \
- --bucket-type couchbase \
- --bucket-ramsize 100 \
- --bucket-replica 1 \
- --wait
- couchbase-cli bucket-create -c couchbase \
- -u Administrator -p password \
- --bucket pekkoquery \
- --bucket-type couchbase \
- --bucket-ramsize 100 \
- --bucket-replica 1 \
- --wait
- sleep 2 # just wait a tiny bit more after creating the bucket
- echo 'CREATE PRIMARY INDEX ON pekkoquery USING GSI;' | \
- cbq -c Administrator:password -e http://couchbase:8093
- "
elasticmq:
image: softwaremill/elasticmq-native:1.5.8
ports:
@@ -251,6 +211,53 @@ services:
- 9090:9090
- 12345:12345
command: standalone
+ couchbase:
+ image: couchbase:community-7.6.1
+ ports:
+ - "8091-8096:8091-8096"
+ - "11210-11211:11210-11211"
+ couchbase_prep:
+ image: couchbase:community-7.6.1
+ links:
+ - "couchbase"
+ entrypoint: ""
+ # when we drop couchbase2, we don't need to create bucket pekkoquery
+ command: >
+ bash -c
+ "echo 'waiting until couchbase is up'
+
+ until `curl --output /dev/null --silent --head --fail
http://couchbase:8091`; do
+ printf '.'
+ sleep 2
+ done
+
+ couchbase-cli cluster-init -c couchbase
+ --cluster-username Administrator
+ --cluster-password password
+ --cluster-ramsize 300
+ --cluster-index-ramsize 256
+ --cluster-fts-ramsize 256
+ --cluster-query-ramsize 256
+ --services data,index,query,fts
+
+ couchbase-cli bucket-create -c couchbase
+ -u Administrator -p password
+ --bucket pekko
+ --bucket-type couchbase
+ --bucket-ramsize 120
+ --bucket-replica 1
+ --wait
+
+ couchbase-cli bucket-create -c couchbase
+ -u Administrator -p password
+ --bucket pekkoquery
+ --bucket-type couchbase
+ --bucket-ramsize 120
+ --bucket-replica 1
+ --wait
+
+ sleep 2 # just wait a tiny bit more after creating the bucket
+ "
volumes:
kudu-tserver-data:
diff --git a/docs/src/main/paradox/couchbase.md
b/docs/src/main/paradox/couchbase.md
index fd505d0e1..2fd83bd3d 100644
--- a/docs/src/main/paradox/couchbase.md
+++ b/docs/src/main/paradox/couchbase.md
@@ -41,7 +41,7 @@ Apache Pekko Connectors Couchbase offers both @ref:[Apache
Pekko Streams APIs](#
* @apidoc[CouchbaseSession] offers a direct API for one-off operations
* @apidoc[CouchbaseSessionRegistry$] is an Apache Pekko extension to keep
track and share `CouchbaseSession`s within an `ActorSystem`
-* @apidoc[CouchbaseSource$], @apidoc[CouchbaseFlow$], and
@apidoc[CouchbaseSink$] offer factory methods to create Apache Pekko Stream
operators
+* @apidoc[couchbase.*.CouchbaseSource$], @apidoc[couchbase.*.CouchbaseFlow$],
and @apidoc[couchbase.*.CouchbaseSink$] offer factory methods to create Apache
Pekko Stream operators
## Configuration
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 654ad689f..22b335f15 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -60,6 +60,7 @@ object Dependencies {
val scalaTestMockitoVersion = "3.2.18.0" //
https://github.com/scalatest/scalatest/issues/2311
val CouchbaseVersion = "2.7.23"
+ val Couchbase3Version = "3.6.0"
val CouchbaseVersionForDocs = "2.7"
val GoogleAuthVersion = "1.23.0"
@@ -158,6 +159,14 @@ object Dependencies {
"com.fasterxml.jackson.core" % "jackson-databind" % JacksonVersion %
Test,
"com.fasterxml.jackson.module" %% "jackson-module-scala" %
JacksonVersion % Test))
+ val Couchbase3 = Seq(
+ libraryDependencies ++= Seq(
+ "com.couchbase.client" % "java-client" % Couchbase3Version,
+ "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided,
+ "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion % Test,
+ "com.fasterxml.jackson.core" % "jackson-databind" % JacksonVersion %
Test,
+ "com.fasterxml.jackson.module" %% "jackson-module-scala" %
JacksonVersion % Test))
+
val `Doc-examples` = Seq(
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]