This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch scala3 in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
commit 01331cc7635a348cde673becbebac3f9a9083316 Author: PJ Fanning <[email protected]> AuthorDate: Sun Jun 11 08:41:40 2023 +0100 support scala3 for amqp connector (#154) format --- .../connectors/amqp/AmqpConnectionProvider.scala | 54 ++++++++++++---------- project/Dependencies.scala | 1 - 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala index 2b0dc6704..c6a3ca303 100644 --- a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala +++ b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala @@ -375,12 +375,14 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr def withAutomaticRelease(automaticRelease: Boolean): AmqpCachedConnectionProvider = copy(automaticRelease = automaticRelease) + override def get: Connection = getRecursive(provider) + @tailrec - override def get: Connection = state.get match { + private def getRecursive(amqpConnectionProvider: AmqpConnectionProvider): Connection = state.get match { case Empty => if (state.compareAndSet(Empty, Connecting)) { try { - val connection = provider.get + val connection = amqpConnectionProvider.get if (!state.compareAndSet(Connecting, Connected(connection, 1))) throw new ConcurrentModificationException( "Unexpected concurrent modification while creating the connection.") @@ -391,34 +393,38 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr state.compareAndSet(Connecting, Empty) throw e } - } else get - case Connecting => get + } else getRecursive(amqpConnectionProvider) + case Connecting => getRecursive(amqpConnectionProvider) case c @ Connected(connection, clients) => if (state.compareAndSet(c, Connected(connection, clients + 1))) connection - else get - case Closing => get + else getRecursive(amqpConnectionProvider) + case Closing => getRecursive(amqpConnectionProvider) } + override def release(connection: Connection): Unit = releaseRecursive(provider, connection) + @tailrec - override def release(connection: Connection): Unit = state.get match { - case Empty => throw new IllegalStateException("There is no connection to release.") - case Connecting => release(connection) - case c @ Connected(cachedConnection, clients) => - if (cachedConnection != connection) - throw new IllegalArgumentException("Can't release a connection that's not owned by this provider") - - if (clients == 1 || !automaticRelease) { - if (state.compareAndSet(c, Closing)) { - provider.release(connection) - if (!state.compareAndSet(Closing, Empty)) - throw new ConcurrentModificationException( - "Unexpected concurrent modification while closing the connection.") + private def releaseRecursive(amqpConnectionProvider: AmqpConnectionProvider, connection: Connection): Unit = + state.get match { + case Empty => throw new IllegalStateException("There is no connection to release.") + case Connecting => releaseRecursive(amqpConnectionProvider, connection) + case c @ Connected(cachedConnection, clients) => + if (cachedConnection != connection) + throw new IllegalArgumentException("Can't release a connection that's not owned by this provider") + + if (clients == 1 || !automaticRelease) { + if (state.compareAndSet(c, Closing)) { + amqpConnectionProvider.release(connection) + if (!state.compareAndSet(Closing, Empty)) + throw new ConcurrentModificationException( + "Unexpected concurrent modification while closing the connection.") + } + } else { + if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1))) + releaseRecursive(amqpConnectionProvider, connection) } - } else { - if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1))) release(connection) - } - case Closing => release(connection) - } + case Closing => releaseRecursive(amqpConnectionProvider, connection) + } private def copy(automaticRelease: Boolean): AmqpCachedConnectionProvider = new AmqpCachedConnectionProvider(provider, automaticRelease) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 85f5d8177..90e79dd29 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -88,7 +88,6 @@ object Dependencies { "com.fasterxml.jackson.core" % "jackson-databind" % JacksonDatabindVersion) val Amqp = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "com.rabbitmq" % "amqp-client" % "5.14.2") ++ Mockito) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
