This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch scala3
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git

commit 01331cc7635a348cde673becbebac3f9a9083316
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jun 11 08:41:40 2023 +0100

    support scala3 for amqp connector (#154)
    
    format
---
 .../connectors/amqp/AmqpConnectionProvider.scala   | 54 ++++++++++++----------
 project/Dependencies.scala                         |  1 -
 2 files changed, 30 insertions(+), 25 deletions(-)

diff --git 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala
 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala
index 2b0dc6704..c6a3ca303 100644
--- 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala
+++ 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala
@@ -375,12 +375,14 @@ final class AmqpCachedConnectionProvider private (val 
provider: AmqpConnectionPr
   def withAutomaticRelease(automaticRelease: Boolean): 
AmqpCachedConnectionProvider =
     copy(automaticRelease = automaticRelease)
 
+  override def get: Connection = getRecursive(provider)
+
   @tailrec
-  override def get: Connection = state.get match {
+  private def getRecursive(amqpConnectionProvider: AmqpConnectionProvider): 
Connection = state.get match {
     case Empty =>
       if (state.compareAndSet(Empty, Connecting)) {
         try {
-          val connection = provider.get
+          val connection = amqpConnectionProvider.get
           if (!state.compareAndSet(Connecting, Connected(connection, 1)))
             throw new ConcurrentModificationException(
               "Unexpected concurrent modification while creating the 
connection.")
@@ -391,34 +393,38 @@ final class AmqpCachedConnectionProvider private (val 
provider: AmqpConnectionPr
             state.compareAndSet(Connecting, Empty)
             throw e
         }
-      } else get
-    case Connecting => get
+      } else getRecursive(amqpConnectionProvider)
+    case Connecting => getRecursive(amqpConnectionProvider)
     case c @ Connected(connection, clients) =>
       if (state.compareAndSet(c, Connected(connection, clients + 1))) 
connection
-      else get
-    case Closing => get
+      else getRecursive(amqpConnectionProvider)
+    case Closing => getRecursive(amqpConnectionProvider)
   }
 
+  override def release(connection: Connection): Unit = 
releaseRecursive(provider, connection)
+
   @tailrec
-  override def release(connection: Connection): Unit = state.get match {
-    case Empty      => throw new IllegalStateException("There is no connection 
to release.")
-    case Connecting => release(connection)
-    case c @ Connected(cachedConnection, clients) =>
-      if (cachedConnection != connection)
-        throw new IllegalArgumentException("Can't release a connection that's 
not owned by this provider")
-
-      if (clients == 1 || !automaticRelease) {
-        if (state.compareAndSet(c, Closing)) {
-          provider.release(connection)
-          if (!state.compareAndSet(Closing, Empty))
-            throw new ConcurrentModificationException(
-              "Unexpected concurrent modification while closing the 
connection.")
+  private def releaseRecursive(amqpConnectionProvider: AmqpConnectionProvider, 
connection: Connection): Unit =
+    state.get match {
+      case Empty      => throw new IllegalStateException("There is no 
connection to release.")
+      case Connecting => releaseRecursive(amqpConnectionProvider, connection)
+      case c @ Connected(cachedConnection, clients) =>
+        if (cachedConnection != connection)
+          throw new IllegalArgumentException("Can't release a connection 
that's not owned by this provider")
+
+        if (clients == 1 || !automaticRelease) {
+          if (state.compareAndSet(c, Closing)) {
+            amqpConnectionProvider.release(connection)
+            if (!state.compareAndSet(Closing, Empty))
+              throw new ConcurrentModificationException(
+                "Unexpected concurrent modification while closing the 
connection.")
+          }
+        } else {
+          if (!state.compareAndSet(c, Connected(cachedConnection, clients - 
1)))
+            releaseRecursive(amqpConnectionProvider, connection)
         }
-      } else {
-        if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1))) 
release(connection)
-      }
-    case Closing => release(connection)
-  }
+      case Closing => releaseRecursive(amqpConnectionProvider, connection)
+    }
 
   private def copy(automaticRelease: Boolean): AmqpCachedConnectionProvider =
     new AmqpCachedConnectionProvider(provider, automaticRelease)
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 85f5d8177..90e79dd29 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -88,7 +88,6 @@ object Dependencies {
     "com.fasterxml.jackson.core" % "jackson-databind" % JacksonDatabindVersion)
 
   val Amqp = Seq(
-    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "com.rabbitmq" % "amqp-client" % "5.14.2") ++ Mockito)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to