This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch scala3
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git

commit 777e5b349fb3474e5133c8f6b7ffd3321cc5690f
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Jun 10 11:05:22 2023 +0100

    Scala3 support for jms connector (#145)
    
    * try scala 3 (#123)
    
    * enable scala3 support for jms connector
    
    * refactor
    
    * scala 2.12 compile issue
    
    * enable elasticsearch
    
    Update RestBulkApi.scala
    
    sort imports
    
    revert merge issue
    
    Update CouchbaseFlow.scala
---
 .../elasticsearch/impl/RestBulkApi.scala           |  6 +--
 .../scaladsl/ElasticsearchConnectorBehaviour.scala |  4 +-
 .../scaladsl/OpensearchConnectorBehaviour.scala    |  4 +-
 .../googlecloud/pubsub/impl/PubSubApi.scala        | 34 ++++++-------
 .../googlecloud/storage/impl/Formats.scala         | 12 ++---
 .../storage/WithMaterializerGlobal.scala           |  2 +-
 .../stream/connectors/google/ResumableUpload.scala | 59 +++++++++++-----------
 .../connectors/google/auth/GoogleOAuth2Spec.scala  |  2 +-
 .../google/auth/OAuth2CredentialsSpec.scala        |  2 +-
 .../firebase/fcm/v1/impl/FcmSenderSpec.scala       |  2 +-
 .../connectors/jms/impl/GraphStageCompanion.scala  | 34 +++++++++++++
 .../connectors/jms/impl/JmsAckSourceStage.scala    |  2 +-
 .../stream/connectors/jms/impl/JmsConnector.scala  | 20 ++++----
 .../connectors/jms/impl/JmsConsumerStage.scala     |  2 +-
 .../connectors/jms/impl/JmsProducerStage.scala     | 18 +++++--
 .../connectors/jms/impl/JmsTxSourceStage.scala     |  2 +-
 .../connectors/jms/impl/SourceStageLogic.scala     | 23 ++++++---
 project/Dependencies.scala                         | 10 +++-
 18 files changed, 148 insertions(+), 90 deletions(-)

diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala
index 70e016de6..e9db5c111 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala
@@ -46,8 +46,8 @@ private[impl] abstract class RestBulkApi[T, C] {
 
   def messageToJson(message: WriteMessage[T, C], messageSource: String): 
String = message.operation match {
     case Index | Create => "\n" + messageSource
-    case Upsert         => "\n" + JsObject("doc" -> messageSource.parseJson, 
"doc_as_upsert" -> JsTrue).toString
-    case Update         => "\n" + JsObject("doc" -> 
messageSource.parseJson).toString
+    case Upsert         => "\n" + JsObject("doc" -> messageSource.parseJson, 
"doc_as_upsert" -> JsTrue).compactPrint
+    case Update         => "\n" + JsObject("doc" -> 
messageSource.parseJson).compactPrint
     case Delete         => ""
     case Nop            => ""
   }
@@ -69,7 +69,7 @@ private[impl] abstract class RestBulkApi[T, C] {
           // good message
           val command = message.operation.command
           val res = itemsIter.next().asJsObject.fields(command).asJsObject
-          val error: Option[String] = res.fields.get("error").map(_.toString())
+          val error: Option[String] = 
res.fields.get("error").map(_.compactPrint)
           ret += new WriteResult(message, error)
         } else {
           // error?
diff --git 
a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala
 
b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala
index 33b8c8032..9f360a90d 100644
--- 
a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala
+++ 
b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala
@@ -200,7 +200,7 @@ trait ElasticsearchConnectorBehaviour {
           .via(
             ElasticsearchFlow.create(
               constructElasticsearchParams(indexName, "_doc", apiVersion),
-              baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 
100.millis))))
+              baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 
100.millis)))(RootJsObjectFormat))
           .runWith(Sink.seq)
 
         val start = System.currentTimeMillis()
@@ -286,7 +286,7 @@ trait ElasticsearchConnectorBehaviour {
           .via(
             ElasticsearchFlow.createWithContext(
               constructElasticsearchParams(indexName, "_doc", apiVersion),
-              baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 
100.millis))))
+              baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 
100.millis)))(RootJsObjectFormat))
           .runWith(Sink.seq)
 
         val start = System.currentTimeMillis()
diff --git 
a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala 
b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala
index 77c513ee1..1c235685a 100644
--- 
a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala
+++ 
b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala
@@ -200,7 +200,7 @@ trait OpensearchConnectorBehaviour {
           .via(
             ElasticsearchFlow.create(
               constructElasticsearchParams(indexName, "_doc", apiVersion),
-              baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 
100.millis))))
+              baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 
100.millis)))(RootJsObjectFormat))
           .runWith(Sink.seq)
 
         val start = System.currentTimeMillis()
@@ -286,7 +286,7 @@ trait OpensearchConnectorBehaviour {
           .via(
             ElasticsearchFlow.createWithContext(
               constructElasticsearchParams(indexName, "_doc", apiVersion),
-              baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 
100.millis))))
+              baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 
100.millis)))(RootJsObjectFormat))
           .runWith(Sink.seq)
 
         val start = System.currentTimeMillis()
diff --git 
a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala
 
b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala
index 90dd33581..696406f89 100644
--- 
a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala
+++ 
b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala
@@ -69,7 +69,7 @@ private[pubsub] trait PubSubApi {
   def PubSubGoogleApisPort: Int
   def isEmulated: Boolean
 
-  private implicit val instantFormat: RootJsonFormat[Instant] = new 
RootJsonFormat[Instant] {
+  private implicit val instantFormat = new RootJsonFormat[Instant] {
     override def read(jsValue: JsValue): Instant = jsValue match {
       case JsString(time) => Instant.parse(time)
       case _              => deserializationError("Instant required as a 
string of RFC3339 UTC Zulu format.")
@@ -77,7 +77,7 @@ private[pubsub] trait PubSubApi {
     override def write(instant: Instant): JsValue = JsString(instant.toString)
   }
 
-  private implicit val pubSubMessageFormat: RootJsonFormat[PubSubMessage] =
+  private implicit val pubSubMessageFormat =
     new RootJsonFormat[PubSubMessage] {
       override def read(json: JsValue): PubSubMessage = {
         val fields = json.asJsObject.fields
@@ -98,7 +98,7 @@ private[pubsub] trait PubSubApi {
           ++ m.attributes.map(attributes => "attributes" -> 
attributes.toJson): _*)
     }
 
-  private implicit val publishMessageFormat: RootJsonFormat[PublishMessage] = 
new RootJsonFormat[PublishMessage] {
+  private implicit val publishMessageFormat = new 
RootJsonFormat[PublishMessage] {
     def read(json: JsValue): PublishMessage = {
       val data = json.asJsObject.fields("data").convertTo[String]
       val attributes = 
json.asJsObject.fields("attributes").convertTo[immutable.Map[String, String]]
@@ -112,39 +112,37 @@ private[pubsub] trait PubSubApi {
         m.attributes.map(a => "attributes" -> a.toJson): _*)
   }
 
-  private implicit val pubSubRequestFormat: RootJsonFormat[PublishRequest] = 
new RootJsonFormat[PublishRequest] {
+  private implicit val pubSubRequestFormat = new 
RootJsonFormat[PublishRequest] {
     def read(json: JsValue): PublishRequest =
       
PublishRequest(json.asJsObject.fields("messages").convertTo[immutable.Seq[PublishMessage]])
     def write(pr: PublishRequest): JsValue = JsObject("messages" -> 
pr.messages.toJson)
   }
-  private implicit val gcePubSubResponseFormat: 
RootJsonFormat[PublishResponse] = new RootJsonFormat[PublishResponse] {
+  private implicit val gcePubSubResponseFormat = new 
RootJsonFormat[PublishResponse] {
     def read(json: JsValue): PublishResponse =
       
PublishResponse(json.asJsObject.fields("messageIds").convertTo[immutable.Seq[String]])
     def write(pr: PublishResponse): JsValue = JsObject("messageIds" -> 
pr.messageIds.toJson)
   }
 
-  private implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] 
= new RootJsonFormat[ReceivedMessage] {
+  private implicit val receivedMessageFormat = new 
RootJsonFormat[ReceivedMessage] {
     def read(json: JsValue): ReceivedMessage =
       ReceivedMessage(json.asJsObject.fields("ackId").convertTo[String],
         json.asJsObject.fields("message").convertTo[PubSubMessage])
     def write(rm: ReceivedMessage): JsValue =
       JsObject("ackId" -> rm.ackId.toJson, "message" -> rm.message.toJson)
   }
-  private implicit val pubSubPullResponseFormat: RootJsonFormat[PullResponse] 
= new RootJsonFormat[PullResponse] {
+  private implicit val pubSubPullResponseFormat = new 
RootJsonFormat[PullResponse] {
     def read(json: JsValue): PullResponse =
       
PullResponse(json.asJsObject.fields.get("receivedMessages").map(_.convertTo[immutable.Seq[ReceivedMessage]]))
     def write(pr: PullResponse): JsValue =
       pr.receivedMessages.map(rm => JsObject("receivedMessages" -> 
rm.toJson)).getOrElse(JsObject.empty)
   }
 
-  private implicit val acknowledgeRequestFormat: 
RootJsonFormat[AcknowledgeRequest] =
-    new RootJsonFormat[AcknowledgeRequest] {
-      def read(json: JsValue): AcknowledgeRequest =
-        
AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]:
 _*)
-      def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> 
ar.ackIds.toJson)
-    }
-  private implicit val pullRequestFormat: RootJsonFormat[PullRequest] =
-    DefaultJsonProtocol.jsonFormat2(PullRequest.apply)
+  private implicit val acknowledgeRequestFormat = new 
RootJsonFormat[AcknowledgeRequest] {
+    def read(json: JsValue): AcknowledgeRequest =
+      
AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]:
 _*)
+    def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> 
ar.ackIds.toJson)
+  }
+  private implicit val pullRequestFormat = 
DefaultJsonProtocol.jsonFormat2(PullRequest.apply)
 
   private def scheme: String = if (isEmulated) "http" else "https"
 
@@ -175,7 +173,7 @@ private[pubsub] trait PubSubApi {
       .mapMaterializedValue(_ => NotUsed)
 
   private implicit val pullResponseUnmarshaller: 
FromResponseUnmarshaller[PullResponse] =
-    Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: 
HttpResponse) =>
+    Unmarshaller.withMaterializer { implicit ec => implicit mat => response: 
HttpResponse =>
       response.status match {
         case StatusCodes.Success(_) if response.entity.contentType == 
ContentTypes.`application/json` =>
           Unmarshal(response.entity).to[PullResponse]
@@ -213,7 +211,7 @@ private[pubsub] trait PubSubApi {
       .mapMaterializedValue(_ => NotUsed)
 
   private implicit val acknowledgeResponseUnmarshaller: 
FromResponseUnmarshaller[Done] =
-    Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: 
HttpResponse) =>
+    Unmarshaller.withMaterializer { implicit ec => implicit mat => response: 
HttpResponse =>
       response.status match {
         case StatusCodes.Success(_) =>
           response.discardEntityBytes().future
@@ -263,7 +261,7 @@ private[pubsub] trait PubSubApi {
     publish(topic, parallelism, None)
 
   private implicit val publishResponseUnmarshaller: 
FromResponseUnmarshaller[PublishResponse] =
-    Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: 
HttpResponse) =>
+    Unmarshaller.withMaterializer { implicit ec => implicit mat => response: 
HttpResponse =>
       response.status match {
         case StatusCodes.Success(_) if response.entity.contentType == 
ContentTypes.`application/json` =>
           Unmarshal(response.entity).to[PublishResponse]
diff --git 
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala
 
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala
index ee2da5dcc..5c20c3931 100644
--- 
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala
+++ 
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala
@@ -49,8 +49,7 @@ object Formats extends DefaultJsonProtocol {
       domain: String,
       projectTeam: ProjectTeam,
       etag: String)
-  private implicit val ObjectAccessControlsJsonFormat: 
RootJsonFormat[ObjectAccessControls] =
-    jsonFormat13(ObjectAccessControls.apply)
+  private implicit val ObjectAccessControlsJsonFormat = 
jsonFormat13(ObjectAccessControls)
 
   /**
    * Google API storage response object
@@ -80,8 +79,7 @@ object Formats extends DefaultJsonProtocol {
       timeStorageClassUpdated: String,
       updated: String)
 
-  private implicit val storageObjectReadOnlyJson: 
RootJsonFormat[StorageObjectReadOnlyJson] =
-    jsonFormat18(StorageObjectReadOnlyJson.apply)
+  private implicit val storageObjectReadOnlyJson = 
jsonFormat18(StorageObjectReadOnlyJson)
 
   // private sub class of StorageObjectJson used to workaround 22 field 
jsonFormat issue
   private final case class StorageObjectWriteableJson(
@@ -100,8 +98,7 @@ object Formats extends DefaultJsonProtocol {
       temporaryHold: Option[Boolean],
       acl: Option[List[ObjectAccessControls]])
 
-  private implicit val storageObjectWritableJson: 
RootJsonFormat[StorageObjectWriteableJson] =
-    jsonFormat14(StorageObjectWriteableJson.apply)
+  private implicit val storageObjectWritableJson = 
jsonFormat14(StorageObjectWriteableJson)
 
   private implicit object StorageObjectJsonFormat extends 
RootJsonFormat[StorageObjectJson] {
     override def read(value: JsValue): StorageObjectJson = {
@@ -178,8 +175,7 @@ object Formats extends DefaultJsonProtocol {
     }
   }
 
-  private implicit val bucketListResultJsonReads: 
RootJsonFormat[BucketListResultJson] =
-    jsonFormat4(BucketListResultJson.apply)
+  private implicit val bucketListResultJsonReads = 
jsonFormat4(BucketListResultJson.apply)
 
   implicit object RewriteResponseReads extends RootJsonReader[RewriteResponse] 
{
     override def read(json: JsValue): RewriteResponse = {
diff --git 
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala
 
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala
index 787885b91..eb046720a 100644
--- 
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala
+++ 
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala
@@ -29,7 +29,7 @@ trait WithMaterializerGlobal
     with ScalaFutures
     with IntegrationPatience
     with Matchers {
-  implicit val actorSystem: ActorSystem = ActorSystem("test")
+  implicit val actorSystem = ActorSystem("test")
   implicit val ec: ExecutionContext = actorSystem.dispatcher
 
   override protected def afterAll(): Unit = {
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
index 969cef4c1..f50c921b9 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
@@ -14,6 +14,7 @@
 package org.apache.pekko.stream.connectors.google
 
 import org.apache.pekko
+import pekko.actor.ActorSystem
 import pekko.NotUsed
 import pekko.annotation.InternalApi
 import pekko.http.scaladsl.model.HttpMethods.{ POST, PUT }
@@ -55,7 +56,7 @@ private[connectors] object ResumableUpload {
     Sink
       .fromMaterializer { (mat, attr) =>
         import mat.executionContext
-        implicit val materializer: Materializer = mat
+        implicit val materializer = mat
         implicit val settings: GoogleSettings = 
GoogleAttributes.resolveSettings(mat, attr)
         val uploadChunkSize = settings.requestSettings.uploadChunkSize
 
@@ -95,24 +96,25 @@ private[connectors] object ResumableUpload {
 
   private def initiateSession(request: HttpRequest)(implicit mat: Materializer,
       settings: GoogleSettings): Future[Uri] = {
+    implicit val system: ActorSystem = mat.system
     import implicits._
 
-    implicit val um: FromResponseUnmarshaller[Uri] =
-      Unmarshaller.withMaterializer { implicit ec => implicit mat => 
(response: HttpResponse) =>
-        response.discardEntityBytes().future.map { _ =>
-          response.header[Location].fold(throw 
InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
-        }
-      }.withDefaultRetry
+    implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit 
mat => response: HttpResponse =>
+      response.discardEntityBytes().future.map { _ =>
+        response.header[Location].fold(throw 
InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
+      }
+    }.withDefaultRetry
 
-    GoogleHttp(mat.system).singleAuthenticatedRequest[Uri](request)
+    GoogleHttp().singleAuthenticatedRequest[Uri](request)
   }
 
   private final case class DoNotRetry(ex: Throwable) extends Throwable(ex) 
with NoStackTrace
 
   private def uploadChunk[T: FromResponseUnmarshaller](
       request: HttpRequest)(implicit mat: Materializer): Flow[Either[T, 
MaybeLast[Chunk]], Try[Option[T]], NotUsed] = {
+    implicit val system: ActorSystem = mat.system
 
-    val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => 
(response: HttpResponse) =>
+    val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => 
response: HttpResponse =>
       response.status match {
         case PermanentRedirect =>
           response.discardEntityBytes().future.map(_ => None)
@@ -125,8 +127,7 @@ private[connectors] object ResumableUpload {
       val uri = request.uri
       Flow[HttpRequest]
         .map((_, ()))
-        
.via(GoogleHttp(mat.system).cachedHostConnectionPoolWithContext(uri.authority.host.address,
 uri.effectivePort)(
-          um))
+        
.via(GoogleHttp().cachedHostConnectionPoolWithContext(uri.authority.host.address,
 uri.effectivePort)(um))
         .map(_._1.recoverWith { case DoNotRetry(ex) => Failure(ex) })
     }
 
@@ -146,30 +147,30 @@ private[connectors] object ResumableUpload {
       request: HttpRequest,
       chunk: Future[MaybeLast[Chunk]])(
       implicit mat: Materializer, settings: GoogleSettings): Future[Either[T, 
MaybeLast[Chunk]]] = {
+    implicit val system: ActorSystem = mat.system
     import implicits._
 
-    implicit val um: FromResponseUnmarshaller[Either[T, Long]] =
-      Unmarshaller.withMaterializer { implicit ec => implicit mat => 
(response: HttpResponse) =>
-        response.status match {
-          case OK | Created => Unmarshal(response).to[T].map(Left(_))
-          case PermanentRedirect =>
-            response.discardEntityBytes().future.map { _ =>
-              Right(
-                response
-                  .header[Range]
-                  .flatMap(_.ranges.headOption)
-                  .collect {
-                    case Slice(_, last) => last + 1
-                  }.getOrElse(0L))
-            }
-          case _ => throw 
InvalidResponseException(ErrorInfo(response.status.value, 
response.status.defaultMessage))
-        }
-      }.withDefaultRetry
+    implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit 
mat => response: HttpResponse =>
+      response.status match {
+        case OK | Created => Unmarshal(response).to[T].map(Left(_))
+        case PermanentRedirect =>
+          response.discardEntityBytes().future.map { _ =>
+            Right(
+              response
+                .header[Range]
+                .flatMap(_.ranges.headOption)
+                .collect {
+                  case Slice(_, last) => last + 1
+                }.getOrElse(0L))
+          }
+        case _ => throw 
InvalidResponseException(ErrorInfo(response.status.value, 
response.status.defaultMessage))
+      }
+    }.withDefaultRetry
 
     import mat.executionContext
     chunk.flatMap {
       case maybeLast @ MaybeLast(Chunk(bytes, position)) =>
-        GoogleHttp(mat.system)
+        GoogleHttp()
           .singleAuthenticatedRequest[Either[T, 
Long]](request.addHeader(statusRequestHeader))
           .map {
             case Left(result) if maybeLast.isLast => Left(result)
diff --git 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala
 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala
index 753e3004a..1bd56b790 100644
--- 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala
+++ 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala
@@ -46,7 +46,7 @@ class GoogleOAuth2Spec
 
   implicit val executionContext: ExecutionContext = system.dispatcher
   implicit val settings: GoogleSettings = GoogleSettings(system)
-  implicit val clock: Clock = Clock.systemUTC()
+  implicit val clock = Clock.systemUTC()
 
   lazy val privateKey = {
     val inputStream = 
getClass.getClassLoader.getResourceAsStream("private_pcks8.pem")
diff --git 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala
 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala
index 446336dd9..7d7e2342a 100644
--- 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala
+++ 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala
@@ -45,7 +45,7 @@ class OAuth2CredentialsSpec
   import system.dispatcher
 
   implicit val settings: RequestSettings = GoogleSettings().requestSettings
-  implicit val clock: Clock = Clock.systemUTC()
+  implicit val clock = Clock.systemUTC()
 
   final object AccessTokenProvider {
     @volatile var accessTokenPromise: Promise[AccessToken] = 
Promise.failed(new RuntimeException)
diff --git 
a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
 
b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
index ca6c18820..a2d4d4922 100644
--- 
a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
+++ 
b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala
@@ -57,7 +57,7 @@ class FcmSenderSpec
 
   implicit val executionContext: ExecutionContext = system.dispatcher
 
-  implicit val conf: FcmSettings = FcmSettings()
+  implicit val conf = FcmSettings()
   implicit val settings: GoogleSettings = GoogleSettings().copy(projectId = 
"projectId")
 
   "FcmSender" should {
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/GraphStageCompanion.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/GraphStageCompanion.scala
new file mode 100644
index 000000000..e43a4d242
--- /dev/null
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/GraphStageCompanion.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
+package org.apache.pekko.stream.connectors.jms.impl
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.Materializer
+import pekko.stream.connectors.jms.Destination
+
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Exposes some protected methods from [[pekko.stream.stage.GraphStage]]
+ * that are not accessible when using Scala3 compiler.
+ */
+@InternalApi
+private[impl] trait GraphStageCompanion {
+  def graphStageMaterializer: Materializer
+
+  def graphStageDestination: Destination
+
+  def scheduleOnceOnGraphStage(timerKey: Any, delay: FiniteDuration): Unit
+
+  def isTimerActiveOnGraphStage(timerKey: Any): Boolean
+
+  def cancelTimerOnGraphStage(timerKey: Any): Unit
+}
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.scala
index 4665463dc..16c2a0da0 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.scala
@@ -49,7 +49,7 @@ private[jms] final class JmsAckSourceStage(settings: 
JmsConsumerSettings, destin
         createDestination: jms.Session => javax.jms.Destination): 
JmsAckSession = {
       val session =
         connection.createSession(false, 
settings.acknowledgeMode.getOrElse(AcknowledgeMode.ClientAcknowledge).mode)
-      new JmsAckSession(connection, session, createDestination(session), 
destination, maxPendingAcks)
+      new JmsAckSession(connection, session, createDestination(session), 
graphStageDestination, maxPendingAcks)
     }
 
     protected def pushMessage(msg: AckEnvelope): Unit = push(out, msg)
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala
index 414462769..5fa898724 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala
@@ -37,7 +37,7 @@ import scala.util.{ Failure, Success, Try }
  */
 @InternalApi
 private[jms] trait JmsConnector[S <: JmsSession] {
-  this: TimerGraphStageLogic with StageLogging =>
+  this: TimerGraphStageLogic with GraphStageCompanion with StageLogging =>
 
   import JmsConnector._
 
@@ -71,12 +71,12 @@ private[jms] trait JmsConnector[S <: JmsSession] {
       Source
         .queue[InternalConnectionState](2, OverflowStrategy.dropHead)
         .toMat(BroadcastHub.sink(1))(Keep.both)
-        .run()(this.materializer)
+        .run()(graphStageMaterializer)
     connectionStateQueue = queue
     connectionStateSourcePromise.complete(Success(source))
 
     // add subscription to purge queued connection status events after the 
configured timeout.
-    scheduleOnce(ConnectionStatusTimeout, 
jmsSettings.connectionStatusSubscriptionTimeout)
+    scheduleOnceOnGraphStage(ConnectionStatusTimeout, 
jmsSettings.connectionStatusSubscriptionTimeout)
   }
 
   protected def finishStop(): Unit = {
@@ -91,7 +91,7 @@ private[jms] trait JmsConnector[S <: JmsSession] {
     closeSessions()
     val previous = updateStateWith(update)
     closeConnectionAsync(connection(previous))
-    if (isTimerActive("connection-status-timeout")) drainConnectionState()
+    if (isTimerActiveOnGraphStage("connection-status-timeout")) 
drainConnectionState()
     connectionStateQueue.complete()
   }
 
@@ -219,7 +219,7 @@ private[jms] trait JmsConnector[S <: JmsSession] {
       closeConnectionAsync(connection(status))
       val delay = if (backoffMaxed) maxBackoff else waitTime(nextAttempt)
       val backoffNowMaxed = backoffMaxed || delay == maxBackoff
-      scheduleOnce(AttemptConnect(nextAttempt, backoffNowMaxed), delay)
+      scheduleOnceOnGraphStage(AttemptConnect(nextAttempt, backoffNowMaxed), 
delay)
     }
   }
 
@@ -234,7 +234,7 @@ private[jms] trait JmsConnector[S <: JmsSession] {
   }
 
   private def drainConnectionState(): Unit =
-    
Source.future(connectionStateSource).flatMapConcat(identity).runWith(Sink.ignore)(this.materializer)
+    
Source.future(connectionStateSource).flatMapConcat(identity).runWith(Sink.ignore)(graphStageMaterializer)
 
   protected def executionContext(attributes: Attributes): ExecutionContext = {
     val dispatcherId = 
(attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match 
{
@@ -244,11 +244,11 @@ private[jms] trait JmsConnector[S <: JmsSession] {
     }) match {
       case d @ ActorAttributes.IODispatcher =>
         // this one is not a dispatcher id, but is a config path pointing to 
the dispatcher id
-        materializer.system.settings.config.getString(d.dispatcher)
+        graphStageMaterializer.system.settings.config.getString(d.dispatcher)
       case d => d.dispatcher
     }
 
-    materializer.system.dispatchers.lookup(dispatcherId)
+    graphStageMaterializer.system.dispatchers.lookup(dispatcherId)
   }
 
   protected def createSession(connection: jms.Connection, createDestination: 
jms.Session => jms.Destination): S
@@ -324,7 +324,7 @@ private[jms] trait JmsConnector[S <: JmsSession] {
 
   private def cancelAckTimers(s: JmsSession): Unit = s match {
     case session: JmsAckSession =>
-      cancelTimer(FlushAcknowledgementsTimerKey(session))
+      cancelTimerOnGraphStage(FlushAcknowledgementsTimerKey(session))
     case _ => ()
   }
 
@@ -341,7 +341,7 @@ private[jms] trait JmsConnector[S <: JmsSession] {
   }
 
   private def openConnection(attempt: Int, backoffMaxed: Boolean): 
Future[jms.Connection] = {
-    implicit val system: ActorSystem = materializer.system
+    implicit val system: ActorSystem = graphStageMaterializer.system
     val jmsConnection = openConnectionAttempt(startConnection)
     updateState(JmsConnectorInitializing(jmsConnection, attempt, backoffMaxed, 
0))
     jmsConnection.map { connection =>
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConsumerStage.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConsumerStage.scala
index 929296655..211995b7c 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConsumerStage.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConsumerStage.scala
@@ -52,7 +52,7 @@ private[jms] final class JmsConsumerStage(settings: 
JmsConsumerSettings, destina
         createDestination: jms.Session => javax.jms.Destination): 
JmsConsumerSession = {
       val session =
         connection.createSession(false, 
settings.acknowledgeMode.getOrElse(AcknowledgeMode.AutoAcknowledge).mode)
-      new JmsConsumerSession(connection, session, createDestination(session), 
destination)
+      new JmsConsumerSession(connection, session, createDestination(session), 
graphStageDestination)
     }
 
     protected def pushMessage(msg: jms.Message): Unit = {
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
index 840f64c3a..b0bcb5daa 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
@@ -23,9 +23,10 @@ import pekko.stream.impl.Buffer
 import pekko.stream.scaladsl.Source
 import pekko.stream.stage._
 import pekko.util.OptionVal
-import javax.jms
 
+import javax.jms
 import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
 import scala.util.control.NoStackTrace
 import scala.util.{ Failure, Success, Try }
 
@@ -34,7 +35,7 @@ import scala.util.{ Failure, Success, Try }
  */
 @InternalApi
 private trait JmsProducerConnector extends JmsConnector[JmsProducerSession] {
-  this: TimerGraphStageLogic with StageLogging =>
+  this: TimerGraphStageLogic with GraphStageCompanion with StageLogging =>
 
   protected final def createSession(connection: jms.Connection,
       createDestination: jms.Session => jms.Destination): JmsProducerSession = 
{
@@ -74,7 +75,18 @@ private[jms] final class JmsProducerStage[E <: 
JmsEnvelope[PassThrough], PassThr
   }
 
   private def producerLogic(inheritedAttributes: Attributes) =
-    new TimerGraphStageLogic(shape) with JmsProducerConnector with 
StageLogging {
+    new TimerGraphStageLogic(shape) with JmsProducerConnector with 
GraphStageCompanion with StageLogging {
+
+      final override def graphStageMaterializer: Materializer = materializer
+
+      final override def graphStageDestination: Destination = destination
+
+      final override def scheduleOnceOnGraphStage(timerKey: Any, delay: 
FiniteDuration): Unit =
+        scheduleOnce(timerKey, delay)
+
+      final override def isTimerActiveOnGraphStage(timerKey: Any): Boolean = 
isTimerActive(timerKey)
+
+      final override def cancelTimerOnGraphStage(timerKey: Any): Unit = 
cancelTimer(timerKey)
 
       /*
        * NOTE: the following code is heavily inspired by 
org.apache.pekko.stream.impl.fusing.MapAsync
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsTxSourceStage.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsTxSourceStage.scala
index 18278e22a..f6db22736 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsTxSourceStage.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsTxSourceStage.scala
@@ -46,7 +46,7 @@ private[jms] final class JmsTxSourceStage(settings: 
JmsConsumerSettings, destina
     protected def createSession(connection: jms.Connection, createDestination: 
jms.Session => javax.jms.Destination) = {
       val session =
         connection.createSession(true, 
settings.acknowledgeMode.getOrElse(AcknowledgeMode.SessionTransacted).mode)
-      new JmsConsumerSession(connection, session, createDestination(session), 
destination)
+      new JmsConsumerSession(connection, session, createDestination(session), 
graphStageDestination)
     }
 
     protected def pushMessage(msg: TxEnvelope): Unit = push(out, msg)
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.scala
index 7debd190f..ac8266280 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.scala
@@ -13,28 +13,27 @@
 
 package org.apache.pekko.stream.connectors.jms.impl
 
-import java.util.concurrent.atomic.AtomicBoolean
-
 import org.apache.pekko
 import pekko.annotation.InternalApi
 import 
pekko.stream.connectors.jms.impl.InternalConnectionState.JmsConnectorStopping
 import pekko.stream.connectors.jms.{ Destination, JmsConsumerSettings }
 import pekko.stream.scaladsl.Source
 import pekko.stream.stage.{ OutHandler, StageLogging, TimerGraphStageLogic }
-import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.stream.{ Attributes, Materializer, Outlet, SourceShape }
 import pekko.{ Done, NotUsed }
 
+import java.util.concurrent.atomic.AtomicBoolean
+import javax.jms
 import scala.collection.mutable
 import scala.util.{ Failure, Success }
-
-import javax.jms
+import scala.concurrent.duration.FiniteDuration
 
 /**
  * Internal API.
  */
 @InternalApi
 private trait JmsConsumerConnector extends JmsConnector[JmsConsumerSession] {
-  this: TimerGraphStageLogic with StageLogging =>
+  this: TimerGraphStageLogic with GraphStageCompanion with StageLogging =>
 
   override val startConnection = true
 
@@ -54,8 +53,20 @@ private abstract class SourceStageLogic[T](shape: 
SourceShape[T],
     inheritedAttributes: Attributes)
     extends TimerGraphStageLogic(shape)
     with JmsConsumerConnector
+    with GraphStageCompanion
     with StageLogging {
 
+  final override def graphStageMaterializer: Materializer = materializer
+
+  final override def graphStageDestination: Destination = destination
+
+  final override def scheduleOnceOnGraphStage(timerKey: Any, delay: 
FiniteDuration): Unit =
+    scheduleOnce(timerKey, delay)
+
+  final override def isTimerActiveOnGraphStage(timerKey: Any): Boolean = 
isTimerActive(timerKey)
+
+  final override def cancelTimerOnGraphStage(timerKey: Any): Unit = 
cancelTimer(timerKey)
+
   override protected def jmsSettings: JmsConsumerSettings = settings
   private val queue = mutable.Queue[T]()
   private val stopping = new AtomicBoolean(false)
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 1a1f851c9..110560467 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -102,6 +102,7 @@ object Dependencies {
         ExclusionRule("software.amazon.awssdk", "netty-nio-client"))) ++ 
Mockito)
 
   val AzureStorageQueue = Seq(
+    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "com.microsoft.azure" % "azure-storage" % "8.0.0"))
 
@@ -110,6 +111,7 @@ object Dependencies {
   val CassandraDriverVersionInDocs = "4.15"
 
   val Cassandra = Seq(
+    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       ("com.datastax.oss" % "java-driver-core" % CassandraDriverVersion)
         .exclude("com.github.spotbugs", "spotbugs-annotations")
@@ -118,6 +120,7 @@ object Dependencies {
       "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided))
 
   val Couchbase = Seq(
+    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "com.couchbase.client" % "java-client" % CouchbaseVersion, // ApacheV2
       "io.reactivex" % "rxjava-reactive-streams" % "1.2.1", // ApacheV2
@@ -145,7 +148,6 @@ object Dependencies {
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion))
 
   val Elasticsearch = Seq(
-    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
       "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion,
@@ -186,6 +188,7 @@ object Dependencies {
         "org.apache.logging.log4j" % "log4j-to-slf4j" % "2.17.1" % Test) ++ 
JacksonDatabindDependencies)
 
   val GoogleCommon = Seq(
+    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
       "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion,
@@ -195,6 +198,7 @@ object Dependencies {
     ) ++ Mockito)
 
   val GoogleBigQuery = Seq(
+    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
       "org.apache.pekko" %% "pekko-http-jackson" % PekkoHttpVersion % Provided,
@@ -220,6 +224,7 @@ object Dependencies {
       "org.apache.pekko" %% "pekko-discovery" % PekkoVersion) ++ Mockito)
 
   val GooglePubSub = Seq(
+    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
       "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion,
@@ -237,6 +242,7 @@ object Dependencies {
       "org.apache.pekko" %% "pekko-discovery" % PekkoVersion))
 
   val GoogleFcm = Seq(
+    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
       "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion) ++ 
Mockito)
@@ -280,6 +286,7 @@ object Dependencies {
       "org.slf4j" % "log4j-over-slf4j" % log4jOverSlf4jVersion % Test))
 
   val HuaweiPushKit = Seq(
+    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
       "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion,
@@ -298,7 +305,6 @@ object Dependencies {
       "org.mdedetrich" %% "pekko-http-circe" % "1.0.0"))
 
   val Jms = Seq(
-    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "javax.jms" % "jms" % "1.1" % Provided,
       "com.ibm.mq" % "com.ibm.mq.allclient" % "9.2.5.0" % Test,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to