This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch scala3
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git

commit 650c13bfb19c58613305c7ff22ad4ab624c3a1f0
Author: PJ Fanning <[email protected]>
AuthorDate: Tue May 30 20:55:16 2023 +0100

    Scala3 couchbase support (#128)
    
    * initial work on scala3 support for couchbase
    
    * Update model.scala
    
    * Update CouchbaseFlow.scala
    
    * use jackson in test code instead of play-json
    
    * Update Dependencies.scala
---
 .../apache/pekko/stream/connectors/couchbase/model.scala |  9 +++++----
 .../connectors/couchbase/scaladsl/CouchbaseFlow.scala    | 16 +++++++++-------
 .../connectors/couchbase/testing/CouchbaseSupport.scala  | 15 +++++++++++----
 project/Dependencies.scala                               | 13 +++++++------
 4 files changed, 32 insertions(+), 21 deletions(-)

diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
index fdbd08ff5..b40e87582 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
@@ -270,7 +270,8 @@ sealed trait CouchbaseWriteResult[T <: Document[_]] {
 /**
  * Emitted for a successful Couchbase write operation.
  */
-final case class CouchbaseWriteSuccess[T <: Document[_]] private (override val 
doc: T) extends CouchbaseWriteResult[T] {
+final case class CouchbaseWriteSuccess[T <: Document[_]] private[couchbase] (
+    override val doc: T) extends CouchbaseWriteResult[T] {
   val isSuccess: Boolean = true
   val isFailure: Boolean = false
 }
@@ -278,7 +279,7 @@ final case class CouchbaseWriteSuccess[T <: Document[_]] 
private (override val d
 /**
  * Emitted for a failed Couchbase write operation.
  */
-final case class CouchbaseWriteFailure[T <: Document[_]] private (override val 
doc: T, failure: Throwable)
+final case class CouchbaseWriteFailure[T <: Document[_]] private[couchbase] 
(override val doc: T, failure: Throwable)
     extends CouchbaseWriteResult[T] {
   val isSuccess: Boolean = false
   val isFailure: Boolean = true
@@ -296,7 +297,7 @@ sealed trait CouchbaseDeleteResult {
 /**
  * Emitted for a successful Couchbase write operation.
  */
-final case class CouchbaseDeleteSuccess private (override val id: String) 
extends CouchbaseDeleteResult {
+final case class CouchbaseDeleteSuccess private[couchbase] (override val id: 
String) extends CouchbaseDeleteResult {
   val isSuccess: Boolean = true
   val isFailure: Boolean = false
 }
@@ -304,7 +305,7 @@ final case class CouchbaseDeleteSuccess private (override 
val id: String) extend
 /**
  * Emitted for a failed Couchbase write operation.
  */
-final case class CouchbaseDeleteFailure private (override val id: String, 
failure: Throwable)
+final case class CouchbaseDeleteFailure private[couchbase] (override val id: 
String, failure: Throwable)
     extends CouchbaseDeleteResult {
   val isSuccess: Boolean = false
   val isFailure: Boolean = true
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
index 6a35d171a..ad35b1cee 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
@@ -29,7 +29,7 @@ import pekko.stream.connectors.couchbase.{
 import pekko.stream.scaladsl.Flow
 import com.couchbase.client.java.document.{ Document, JsonDocument }
 
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ ExecutionContext, Future }
 
 /**
  * Scala API: Factory methods for Couchbase flows.
@@ -100,8 +100,8 @@ object CouchbaseFlow {
    */
   def upsertDocWithResult[T <: Document[_]](sessionSettings: 
CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
-      bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
-    Flow
+      bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = {
+    val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow
       .fromMaterializer { (materializer, _) =>
         val session = 
CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, 
bucketName)
         Flow[T]
@@ -115,7 +115,8 @@ object CouchbaseFlow {
               }
           })
       }
-      .mapMaterializedValue(_ => NotUsed)
+    flow.mapMaterializedValue(_ => NotUsed)
+  }
 
   /**
    * Create a flow to replace a Couchbase 
[[com.couchbase.client.java.document.JsonDocument JsonDocument]].
@@ -153,8 +154,8 @@ object CouchbaseFlow {
    */
   def replaceDocWithResult[T <: Document[_]](sessionSettings: 
CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
-      bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
-    Flow
+      bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = {
+    val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow
       .fromMaterializer { (materializer, _) =>
         val session = 
CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, 
bucketName)
         Flow[T]
@@ -168,7 +169,8 @@ object CouchbaseFlow {
               }
           })
       }
-      .mapMaterializedValue(_ => NotUsed)
+    flow.mapMaterializedValue(_ => NotUsed)
+  }
 
   /**
    * Create a flow to delete documents from Couchbase by `id`. Emits the same 
`id`.
diff --git 
a/couchbase/src/test/scala/org/apache/pekko/stream/connectors/couchbase/testing/CouchbaseSupport.scala
 
b/couchbase/src/test/scala/org/apache/pekko/stream/connectors/couchbase/testing/CouchbaseSupport.scala
index e517dfe39..808413f72 100644
--- 
a/couchbase/src/test/scala/org/apache/pekko/stream/connectors/couchbase/testing/CouchbaseSupport.scala
+++ 
b/couchbase/src/test/scala/org/apache/pekko/stream/connectors/couchbase/testing/CouchbaseSupport.scala
@@ -25,8 +25,9 @@ import com.couchbase.client.deps.io.netty.util.CharsetUtil
 import com.couchbase.client.java.ReplicateTo
 import com.couchbase.client.java.document.json.JsonObject
 import com.couchbase.client.java.document.{ BinaryDocument, JsonDocument, 
RawJsonDocument, StringDocument }
+import com.fasterxml.jackson.databind.json.JsonMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.slf4j.LoggerFactory
-import play.api.libs.json.Json
 
 import scala.collection.immutable.Seq
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -35,6 +36,12 @@ import scala.concurrent.{ Await, Future }
 
 case class TestObject(id: String, value: String)
 
+private[couchbase] object CouchbaseSupport {
+  val jacksonMapper = JsonMapper.builder()
+    .addModule(DefaultScalaModule)
+    .build()
+}
+
 trait CouchbaseSupport {
 
   private val log = LoggerFactory.getLogger(classOf[CouchbaseSupport])
@@ -64,7 +71,7 @@ trait CouchbaseSupport {
   }
 
   def toRawJsonDocument(testObject: TestObject): RawJsonDocument = {
-    val json = Json.toJson(testObject)(Json.writes[TestObject]).toString()
+    val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject)
     RawJsonDocument.create(testObject.id, json)
   }
 
@@ -72,12 +79,12 @@ trait CouchbaseSupport {
     JsonDocument.create(testObject.id, JsonObject.create().put("id", 
testObject.id).put("value", testObject.value))
 
   def toStringDocument(testObject: TestObject): StringDocument = {
-    val json = Json.toJson(testObject)(Json.writes[TestObject]).toString()
+    val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject)
     StringDocument.create(testObject.id, json)
   }
 
   def toBinaryDocument(testObject: TestObject): BinaryDocument = {
-    val json = Json.toJson(testObject)(Json.writes[TestObject]).toString()
+    val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject)
     val toWrite = Unpooled.copiedBuffer(json, CharsetUtil.UTF_8)
     BinaryDocument.create(testObject.id, toWrite)
   }
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 458601ffa..f711adcd3 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -120,13 +120,14 @@ object Dependencies {
       "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided))
 
   val Couchbase = Seq(
-    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
-      "com.couchbase.client" % "java-client" % CouchbaseVersion,
-      "io.reactivex" % "rxjava-reactive-streams" % "1.2.1",
-      "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided,
-      "com.typesafe.play" %% "play-json" % "2.9.2" % Test,
-      "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion % Test))
+      "com.couchbase.client" % "java-client" % CouchbaseVersion, // ApacheV2
+      "io.reactivex" % "rxjava-reactive-streams" % "1.2.1", // ApacheV2
+      "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided, // 
Apache V2
+      "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion % Test, // Apache 
V2
+      "com.fasterxml.jackson.core" % "jackson-databind" % 
JacksonDatabindVersion % Test, // Apache V2
+      "com.fasterxml.jackson.module" %% "jackson-module-scala" % 
JacksonDatabindVersion % Test // Apache V2
+    ))
 
   val `Doc-examples` = Seq(
     libraryDependencies ++= Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to