This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch scala3 in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
commit 777e5b349fb3474e5133c8f6b7ffd3321cc5690f Author: PJ Fanning <[email protected]> AuthorDate: Sat Jun 10 11:05:22 2023 +0100 Scala3 support for jms connector (#145) * try scala 3 (#123) * enable scala3 support for jms connector * refactor * scala 2.12 compile issue * enable elasticsearch Update RestBulkApi.scala sort imports revert merge issue Update CouchbaseFlow.scala --- .../elasticsearch/impl/RestBulkApi.scala | 6 +-- .../scaladsl/ElasticsearchConnectorBehaviour.scala | 4 +- .../scaladsl/OpensearchConnectorBehaviour.scala | 4 +- .../googlecloud/pubsub/impl/PubSubApi.scala | 34 ++++++------- .../googlecloud/storage/impl/Formats.scala | 12 ++--- .../storage/WithMaterializerGlobal.scala | 2 +- .../stream/connectors/google/ResumableUpload.scala | 59 +++++++++++----------- .../connectors/google/auth/GoogleOAuth2Spec.scala | 2 +- .../google/auth/OAuth2CredentialsSpec.scala | 2 +- .../firebase/fcm/v1/impl/FcmSenderSpec.scala | 2 +- .../connectors/jms/impl/GraphStageCompanion.scala | 34 +++++++++++++ .../connectors/jms/impl/JmsAckSourceStage.scala | 2 +- .../stream/connectors/jms/impl/JmsConnector.scala | 20 ++++---- .../connectors/jms/impl/JmsConsumerStage.scala | 2 +- .../connectors/jms/impl/JmsProducerStage.scala | 18 +++++-- .../connectors/jms/impl/JmsTxSourceStage.scala | 2 +- .../connectors/jms/impl/SourceStageLogic.scala | 23 ++++++--- project/Dependencies.scala | 10 +++- 18 files changed, 148 insertions(+), 90 deletions(-) diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala index 70e016de6..e9db5c111 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala @@ -46,8 +46,8 @@ private[impl] abstract class RestBulkApi[T, C] { def messageToJson(message: WriteMessage[T, C], messageSource: String): String = message.operation match { case Index | Create => "\n" + messageSource - case Upsert => "\n" + JsObject("doc" -> messageSource.parseJson, "doc_as_upsert" -> JsTrue).toString - case Update => "\n" + JsObject("doc" -> messageSource.parseJson).toString + case Upsert => "\n" + JsObject("doc" -> messageSource.parseJson, "doc_as_upsert" -> JsTrue).compactPrint + case Update => "\n" + JsObject("doc" -> messageSource.parseJson).compactPrint case Delete => "" case Nop => "" } @@ -69,7 +69,7 @@ private[impl] abstract class RestBulkApi[T, C] { // good message val command = message.operation.command val res = itemsIter.next().asJsObject.fields(command).asJsObject - val error: Option[String] = res.fields.get("error").map(_.toString()) + val error: Option[String] = res.fields.get("error").map(_.compactPrint) ret += new WriteResult(message, error) } else { // error? diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala index 33b8c8032..9f360a90d 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala @@ -200,7 +200,7 @@ trait ElasticsearchConnectorBehaviour { .via( ElasticsearchFlow.create( constructElasticsearchParams(indexName, "_doc", apiVersion), - baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)))) + baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)))(RootJsObjectFormat)) .runWith(Sink.seq) val start = System.currentTimeMillis() @@ -286,7 +286,7 @@ trait ElasticsearchConnectorBehaviour { .via( ElasticsearchFlow.createWithContext( constructElasticsearchParams(indexName, "_doc", apiVersion), - baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)))) + baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)))(RootJsObjectFormat)) .runWith(Sink.seq) val start = System.currentTimeMillis() diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala index 77c513ee1..1c235685a 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala @@ -200,7 +200,7 @@ trait OpensearchConnectorBehaviour { .via( ElasticsearchFlow.create( constructElasticsearchParams(indexName, "_doc", apiVersion), - baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)))) + baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)))(RootJsObjectFormat)) .runWith(Sink.seq) val start = System.currentTimeMillis() @@ -286,7 +286,7 @@ trait OpensearchConnectorBehaviour { .via( ElasticsearchFlow.createWithContext( constructElasticsearchParams(indexName, "_doc", apiVersion), - baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)))) + baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)))(RootJsObjectFormat)) .runWith(Sink.seq) val start = System.currentTimeMillis() diff --git a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala index 90dd33581..696406f89 100644 --- a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala +++ b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala @@ -69,7 +69,7 @@ private[pubsub] trait PubSubApi { def PubSubGoogleApisPort: Int def isEmulated: Boolean - private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] { + private implicit val instantFormat = new RootJsonFormat[Instant] { override def read(jsValue: JsValue): Instant = jsValue match { case JsString(time) => Instant.parse(time) case _ => deserializationError("Instant required as a string of RFC3339 UTC Zulu format.") @@ -77,7 +77,7 @@ private[pubsub] trait PubSubApi { override def write(instant: Instant): JsValue = JsString(instant.toString) } - private implicit val pubSubMessageFormat: RootJsonFormat[PubSubMessage] = + private implicit val pubSubMessageFormat = new RootJsonFormat[PubSubMessage] { override def read(json: JsValue): PubSubMessage = { val fields = json.asJsObject.fields @@ -98,7 +98,7 @@ private[pubsub] trait PubSubApi { ++ m.attributes.map(attributes => "attributes" -> attributes.toJson): _*) } - private implicit val publishMessageFormat: RootJsonFormat[PublishMessage] = new RootJsonFormat[PublishMessage] { + private implicit val publishMessageFormat = new RootJsonFormat[PublishMessage] { def read(json: JsValue): PublishMessage = { val data = json.asJsObject.fields("data").convertTo[String] val attributes = json.asJsObject.fields("attributes").convertTo[immutable.Map[String, String]] @@ -112,39 +112,37 @@ private[pubsub] trait PubSubApi { m.attributes.map(a => "attributes" -> a.toJson): _*) } - private implicit val pubSubRequestFormat: RootJsonFormat[PublishRequest] = new RootJsonFormat[PublishRequest] { + private implicit val pubSubRequestFormat = new RootJsonFormat[PublishRequest] { def read(json: JsValue): PublishRequest = PublishRequest(json.asJsObject.fields("messages").convertTo[immutable.Seq[PublishMessage]]) def write(pr: PublishRequest): JsValue = JsObject("messages" -> pr.messages.toJson) } - private implicit val gcePubSubResponseFormat: RootJsonFormat[PublishResponse] = new RootJsonFormat[PublishResponse] { + private implicit val gcePubSubResponseFormat = new RootJsonFormat[PublishResponse] { def read(json: JsValue): PublishResponse = PublishResponse(json.asJsObject.fields("messageIds").convertTo[immutable.Seq[String]]) def write(pr: PublishResponse): JsValue = JsObject("messageIds" -> pr.messageIds.toJson) } - private implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = new RootJsonFormat[ReceivedMessage] { + private implicit val receivedMessageFormat = new RootJsonFormat[ReceivedMessage] { def read(json: JsValue): ReceivedMessage = ReceivedMessage(json.asJsObject.fields("ackId").convertTo[String], json.asJsObject.fields("message").convertTo[PubSubMessage]) def write(rm: ReceivedMessage): JsValue = JsObject("ackId" -> rm.ackId.toJson, "message" -> rm.message.toJson) } - private implicit val pubSubPullResponseFormat: RootJsonFormat[PullResponse] = new RootJsonFormat[PullResponse] { + private implicit val pubSubPullResponseFormat = new RootJsonFormat[PullResponse] { def read(json: JsValue): PullResponse = PullResponse(json.asJsObject.fields.get("receivedMessages").map(_.convertTo[immutable.Seq[ReceivedMessage]])) def write(pr: PullResponse): JsValue = pr.receivedMessages.map(rm => JsObject("receivedMessages" -> rm.toJson)).getOrElse(JsObject.empty) } - private implicit val acknowledgeRequestFormat: RootJsonFormat[AcknowledgeRequest] = - new RootJsonFormat[AcknowledgeRequest] { - def read(json: JsValue): AcknowledgeRequest = - AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]: _*) - def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> ar.ackIds.toJson) - } - private implicit val pullRequestFormat: RootJsonFormat[PullRequest] = - DefaultJsonProtocol.jsonFormat2(PullRequest.apply) + private implicit val acknowledgeRequestFormat = new RootJsonFormat[AcknowledgeRequest] { + def read(json: JsValue): AcknowledgeRequest = + AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]: _*) + def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> ar.ackIds.toJson) + } + private implicit val pullRequestFormat = DefaultJsonProtocol.jsonFormat2(PullRequest.apply) private def scheme: String = if (isEmulated) "http" else "https" @@ -175,7 +173,7 @@ private[pubsub] trait PubSubApi { .mapMaterializedValue(_ => NotUsed) private implicit val pullResponseUnmarshaller: FromResponseUnmarshaller[PullResponse] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => + Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => response.status match { case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` => Unmarshal(response.entity).to[PullResponse] @@ -213,7 +211,7 @@ private[pubsub] trait PubSubApi { .mapMaterializedValue(_ => NotUsed) private implicit val acknowledgeResponseUnmarshaller: FromResponseUnmarshaller[Done] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => + Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => response.status match { case StatusCodes.Success(_) => response.discardEntityBytes().future @@ -263,7 +261,7 @@ private[pubsub] trait PubSubApi { publish(topic, parallelism, None) private implicit val publishResponseUnmarshaller: FromResponseUnmarshaller[PublishResponse] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => + Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => response.status match { case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` => Unmarshal(response.entity).to[PublishResponse] diff --git a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala index ee2da5dcc..5c20c3931 100644 --- a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala +++ b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala @@ -49,8 +49,7 @@ object Formats extends DefaultJsonProtocol { domain: String, projectTeam: ProjectTeam, etag: String) - private implicit val ObjectAccessControlsJsonFormat: RootJsonFormat[ObjectAccessControls] = - jsonFormat13(ObjectAccessControls.apply) + private implicit val ObjectAccessControlsJsonFormat = jsonFormat13(ObjectAccessControls) /** * Google API storage response object @@ -80,8 +79,7 @@ object Formats extends DefaultJsonProtocol { timeStorageClassUpdated: String, updated: String) - private implicit val storageObjectReadOnlyJson: RootJsonFormat[StorageObjectReadOnlyJson] = - jsonFormat18(StorageObjectReadOnlyJson.apply) + private implicit val storageObjectReadOnlyJson = jsonFormat18(StorageObjectReadOnlyJson) // private sub class of StorageObjectJson used to workaround 22 field jsonFormat issue private final case class StorageObjectWriteableJson( @@ -100,8 +98,7 @@ object Formats extends DefaultJsonProtocol { temporaryHold: Option[Boolean], acl: Option[List[ObjectAccessControls]]) - private implicit val storageObjectWritableJson: RootJsonFormat[StorageObjectWriteableJson] = - jsonFormat14(StorageObjectWriteableJson.apply) + private implicit val storageObjectWritableJson = jsonFormat14(StorageObjectWriteableJson) private implicit object StorageObjectJsonFormat extends RootJsonFormat[StorageObjectJson] { override def read(value: JsValue): StorageObjectJson = { @@ -178,8 +175,7 @@ object Formats extends DefaultJsonProtocol { } } - private implicit val bucketListResultJsonReads: RootJsonFormat[BucketListResultJson] = - jsonFormat4(BucketListResultJson.apply) + private implicit val bucketListResultJsonReads = jsonFormat4(BucketListResultJson.apply) implicit object RewriteResponseReads extends RootJsonReader[RewriteResponse] { override def read(json: JsValue): RewriteResponse = { diff --git a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala index 787885b91..eb046720a 100644 --- a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala +++ b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala @@ -29,7 +29,7 @@ trait WithMaterializerGlobal with ScalaFutures with IntegrationPatience with Matchers { - implicit val actorSystem: ActorSystem = ActorSystem("test") + implicit val actorSystem = ActorSystem("test") implicit val ec: ExecutionContext = actorSystem.dispatcher override protected def afterAll(): Unit = { diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala index 969cef4c1..f50c921b9 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala @@ -14,6 +14,7 @@ package org.apache.pekko.stream.connectors.google import org.apache.pekko +import pekko.actor.ActorSystem import pekko.NotUsed import pekko.annotation.InternalApi import pekko.http.scaladsl.model.HttpMethods.{ POST, PUT } @@ -55,7 +56,7 @@ private[connectors] object ResumableUpload { Sink .fromMaterializer { (mat, attr) => import mat.executionContext - implicit val materializer: Materializer = mat + implicit val materializer = mat implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr) val uploadChunkSize = settings.requestSettings.uploadChunkSize @@ -95,24 +96,25 @@ private[connectors] object ResumableUpload { private def initiateSession(request: HttpRequest)(implicit mat: Materializer, settings: GoogleSettings): Future[Uri] = { + implicit val system: ActorSystem = mat.system import implicits._ - implicit val um: FromResponseUnmarshaller[Uri] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => - response.discardEntityBytes().future.map { _ => - response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri) - } - }.withDefaultRetry + implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => + response.discardEntityBytes().future.map { _ => + response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri) + } + }.withDefaultRetry - GoogleHttp(mat.system).singleAuthenticatedRequest[Uri](request) + GoogleHttp().singleAuthenticatedRequest[Uri](request) } private final case class DoNotRetry(ex: Throwable) extends Throwable(ex) with NoStackTrace private def uploadChunk[T: FromResponseUnmarshaller]( request: HttpRequest)(implicit mat: Materializer): Flow[Either[T, MaybeLast[Chunk]], Try[Option[T]], NotUsed] = { + implicit val system: ActorSystem = mat.system - val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => + val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => response.status match { case PermanentRedirect => response.discardEntityBytes().future.map(_ => None) @@ -125,8 +127,7 @@ private[connectors] object ResumableUpload { val uri = request.uri Flow[HttpRequest] .map((_, ())) - .via(GoogleHttp(mat.system).cachedHostConnectionPoolWithContext(uri.authority.host.address, uri.effectivePort)( - um)) + .via(GoogleHttp().cachedHostConnectionPoolWithContext(uri.authority.host.address, uri.effectivePort)(um)) .map(_._1.recoverWith { case DoNotRetry(ex) => Failure(ex) }) } @@ -146,30 +147,30 @@ private[connectors] object ResumableUpload { request: HttpRequest, chunk: Future[MaybeLast[Chunk]])( implicit mat: Materializer, settings: GoogleSettings): Future[Either[T, MaybeLast[Chunk]]] = { + implicit val system: ActorSystem = mat.system import implicits._ - implicit val um: FromResponseUnmarshaller[Either[T, Long]] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => - response.status match { - case OK | Created => Unmarshal(response).to[T].map(Left(_)) - case PermanentRedirect => - response.discardEntityBytes().future.map { _ => - Right( - response - .header[Range] - .flatMap(_.ranges.headOption) - .collect { - case Slice(_, last) => last + 1 - }.getOrElse(0L)) - } - case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage)) - } - }.withDefaultRetry + implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => + response.status match { + case OK | Created => Unmarshal(response).to[T].map(Left(_)) + case PermanentRedirect => + response.discardEntityBytes().future.map { _ => + Right( + response + .header[Range] + .flatMap(_.ranges.headOption) + .collect { + case Slice(_, last) => last + 1 + }.getOrElse(0L)) + } + case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage)) + } + }.withDefaultRetry import mat.executionContext chunk.flatMap { case maybeLast @ MaybeLast(Chunk(bytes, position)) => - GoogleHttp(mat.system) + GoogleHttp() .singleAuthenticatedRequest[Either[T, Long]](request.addHeader(statusRequestHeader)) .map { case Left(result) if maybeLast.isLast => Left(result) diff --git a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala index 753e3004a..1bd56b790 100644 --- a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala +++ b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala @@ -46,7 +46,7 @@ class GoogleOAuth2Spec implicit val executionContext: ExecutionContext = system.dispatcher implicit val settings: GoogleSettings = GoogleSettings(system) - implicit val clock: Clock = Clock.systemUTC() + implicit val clock = Clock.systemUTC() lazy val privateKey = { val inputStream = getClass.getClassLoader.getResourceAsStream("private_pcks8.pem") diff --git a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala index 446336dd9..7d7e2342a 100644 --- a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala +++ b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala @@ -45,7 +45,7 @@ class OAuth2CredentialsSpec import system.dispatcher implicit val settings: RequestSettings = GoogleSettings().requestSettings - implicit val clock: Clock = Clock.systemUTC() + implicit val clock = Clock.systemUTC() final object AccessTokenProvider { @volatile var accessTokenPromise: Promise[AccessToken] = Promise.failed(new RuntimeException) diff --git a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala index ca6c18820..a2d4d4922 100644 --- a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala +++ b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala @@ -57,7 +57,7 @@ class FcmSenderSpec implicit val executionContext: ExecutionContext = system.dispatcher - implicit val conf: FcmSettings = FcmSettings() + implicit val conf = FcmSettings() implicit val settings: GoogleSettings = GoogleSettings().copy(projectId = "projectId") "FcmSender" should { diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/GraphStageCompanion.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/GraphStageCompanion.scala new file mode 100644 index 000000000..e43a4d242 --- /dev/null +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/GraphStageCompanion.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, derived from Akka. + */ + +package org.apache.pekko.stream.connectors.jms.impl + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.Materializer +import pekko.stream.connectors.jms.Destination + +import scala.concurrent.duration.FiniteDuration + +/** + * Exposes some protected methods from [[pekko.stream.stage.GraphStage]] + * that are not accessible when using Scala3 compiler. + */ +@InternalApi +private[impl] trait GraphStageCompanion { + def graphStageMaterializer: Materializer + + def graphStageDestination: Destination + + def scheduleOnceOnGraphStage(timerKey: Any, delay: FiniteDuration): Unit + + def isTimerActiveOnGraphStage(timerKey: Any): Boolean + + def cancelTimerOnGraphStage(timerKey: Any): Unit +} diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.scala index 4665463dc..16c2a0da0 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.scala @@ -49,7 +49,7 @@ private[jms] final class JmsAckSourceStage(settings: JmsConsumerSettings, destin createDestination: jms.Session => javax.jms.Destination): JmsAckSession = { val session = connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.ClientAcknowledge).mode) - new JmsAckSession(connection, session, createDestination(session), destination, maxPendingAcks) + new JmsAckSession(connection, session, createDestination(session), graphStageDestination, maxPendingAcks) } protected def pushMessage(msg: AckEnvelope): Unit = push(out, msg) diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala index 414462769..5fa898724 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConnector.scala @@ -37,7 +37,7 @@ import scala.util.{ Failure, Success, Try } */ @InternalApi private[jms] trait JmsConnector[S <: JmsSession] { - this: TimerGraphStageLogic with StageLogging => + this: TimerGraphStageLogic with GraphStageCompanion with StageLogging => import JmsConnector._ @@ -71,12 +71,12 @@ private[jms] trait JmsConnector[S <: JmsSession] { Source .queue[InternalConnectionState](2, OverflowStrategy.dropHead) .toMat(BroadcastHub.sink(1))(Keep.both) - .run()(this.materializer) + .run()(graphStageMaterializer) connectionStateQueue = queue connectionStateSourcePromise.complete(Success(source)) // add subscription to purge queued connection status events after the configured timeout. - scheduleOnce(ConnectionStatusTimeout, jmsSettings.connectionStatusSubscriptionTimeout) + scheduleOnceOnGraphStage(ConnectionStatusTimeout, jmsSettings.connectionStatusSubscriptionTimeout) } protected def finishStop(): Unit = { @@ -91,7 +91,7 @@ private[jms] trait JmsConnector[S <: JmsSession] { closeSessions() val previous = updateStateWith(update) closeConnectionAsync(connection(previous)) - if (isTimerActive("connection-status-timeout")) drainConnectionState() + if (isTimerActiveOnGraphStage("connection-status-timeout")) drainConnectionState() connectionStateQueue.complete() } @@ -219,7 +219,7 @@ private[jms] trait JmsConnector[S <: JmsSession] { closeConnectionAsync(connection(status)) val delay = if (backoffMaxed) maxBackoff else waitTime(nextAttempt) val backoffNowMaxed = backoffMaxed || delay == maxBackoff - scheduleOnce(AttemptConnect(nextAttempt, backoffNowMaxed), delay) + scheduleOnceOnGraphStage(AttemptConnect(nextAttempt, backoffNowMaxed), delay) } } @@ -234,7 +234,7 @@ private[jms] trait JmsConnector[S <: JmsSession] { } private def drainConnectionState(): Unit = - Source.future(connectionStateSource).flatMapConcat(identity).runWith(Sink.ignore)(this.materializer) + Source.future(connectionStateSource).flatMapConcat(identity).runWith(Sink.ignore)(graphStageMaterializer) protected def executionContext(attributes: Attributes): ExecutionContext = { val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match { @@ -244,11 +244,11 @@ private[jms] trait JmsConnector[S <: JmsSession] { }) match { case d @ ActorAttributes.IODispatcher => // this one is not a dispatcher id, but is a config path pointing to the dispatcher id - materializer.system.settings.config.getString(d.dispatcher) + graphStageMaterializer.system.settings.config.getString(d.dispatcher) case d => d.dispatcher } - materializer.system.dispatchers.lookup(dispatcherId) + graphStageMaterializer.system.dispatchers.lookup(dispatcherId) } protected def createSession(connection: jms.Connection, createDestination: jms.Session => jms.Destination): S @@ -324,7 +324,7 @@ private[jms] trait JmsConnector[S <: JmsSession] { private def cancelAckTimers(s: JmsSession): Unit = s match { case session: JmsAckSession => - cancelTimer(FlushAcknowledgementsTimerKey(session)) + cancelTimerOnGraphStage(FlushAcknowledgementsTimerKey(session)) case _ => () } @@ -341,7 +341,7 @@ private[jms] trait JmsConnector[S <: JmsSession] { } private def openConnection(attempt: Int, backoffMaxed: Boolean): Future[jms.Connection] = { - implicit val system: ActorSystem = materializer.system + implicit val system: ActorSystem = graphStageMaterializer.system val jmsConnection = openConnectionAttempt(startConnection) updateState(JmsConnectorInitializing(jmsConnection, attempt, backoffMaxed, 0)) jmsConnection.map { connection => diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConsumerStage.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConsumerStage.scala index 929296655..211995b7c 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConsumerStage.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsConsumerStage.scala @@ -52,7 +52,7 @@ private[jms] final class JmsConsumerStage(settings: JmsConsumerSettings, destina createDestination: jms.Session => javax.jms.Destination): JmsConsumerSession = { val session = connection.createSession(false, settings.acknowledgeMode.getOrElse(AcknowledgeMode.AutoAcknowledge).mode) - new JmsConsumerSession(connection, session, createDestination(session), destination) + new JmsConsumerSession(connection, session, createDestination(session), graphStageDestination) } protected def pushMessage(msg: jms.Message): Unit = { diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala index 840f64c3a..b0bcb5daa 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala @@ -23,9 +23,10 @@ import pekko.stream.impl.Buffer import pekko.stream.scaladsl.Source import pekko.stream.stage._ import pekko.util.OptionVal -import javax.jms +import javax.jms import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration import scala.util.control.NoStackTrace import scala.util.{ Failure, Success, Try } @@ -34,7 +35,7 @@ import scala.util.{ Failure, Success, Try } */ @InternalApi private trait JmsProducerConnector extends JmsConnector[JmsProducerSession] { - this: TimerGraphStageLogic with StageLogging => + this: TimerGraphStageLogic with GraphStageCompanion with StageLogging => protected final def createSession(connection: jms.Connection, createDestination: jms.Session => jms.Destination): JmsProducerSession = { @@ -74,7 +75,18 @@ private[jms] final class JmsProducerStage[E <: JmsEnvelope[PassThrough], PassThr } private def producerLogic(inheritedAttributes: Attributes) = - new TimerGraphStageLogic(shape) with JmsProducerConnector with StageLogging { + new TimerGraphStageLogic(shape) with JmsProducerConnector with GraphStageCompanion with StageLogging { + + final override def graphStageMaterializer: Materializer = materializer + + final override def graphStageDestination: Destination = destination + + final override def scheduleOnceOnGraphStage(timerKey: Any, delay: FiniteDuration): Unit = + scheduleOnce(timerKey, delay) + + final override def isTimerActiveOnGraphStage(timerKey: Any): Boolean = isTimerActive(timerKey) + + final override def cancelTimerOnGraphStage(timerKey: Any): Unit = cancelTimer(timerKey) /* * NOTE: the following code is heavily inspired by org.apache.pekko.stream.impl.fusing.MapAsync diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsTxSourceStage.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsTxSourceStage.scala index 18278e22a..f6db22736 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsTxSourceStage.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsTxSourceStage.scala @@ -46,7 +46,7 @@ private[jms] final class JmsTxSourceStage(settings: JmsConsumerSettings, destina protected def createSession(connection: jms.Connection, createDestination: jms.Session => javax.jms.Destination) = { val session = connection.createSession(true, settings.acknowledgeMode.getOrElse(AcknowledgeMode.SessionTransacted).mode) - new JmsConsumerSession(connection, session, createDestination(session), destination) + new JmsConsumerSession(connection, session, createDestination(session), graphStageDestination) } protected def pushMessage(msg: TxEnvelope): Unit = push(out, msg) diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.scala index 7debd190f..ac8266280 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.scala @@ -13,28 +13,27 @@ package org.apache.pekko.stream.connectors.jms.impl -import java.util.concurrent.atomic.AtomicBoolean - import org.apache.pekko import pekko.annotation.InternalApi import pekko.stream.connectors.jms.impl.InternalConnectionState.JmsConnectorStopping import pekko.stream.connectors.jms.{ Destination, JmsConsumerSettings } import pekko.stream.scaladsl.Source import pekko.stream.stage.{ OutHandler, StageLogging, TimerGraphStageLogic } -import pekko.stream.{ Attributes, Outlet, SourceShape } +import pekko.stream.{ Attributes, Materializer, Outlet, SourceShape } import pekko.{ Done, NotUsed } +import java.util.concurrent.atomic.AtomicBoolean +import javax.jms import scala.collection.mutable import scala.util.{ Failure, Success } - -import javax.jms +import scala.concurrent.duration.FiniteDuration /** * Internal API. */ @InternalApi private trait JmsConsumerConnector extends JmsConnector[JmsConsumerSession] { - this: TimerGraphStageLogic with StageLogging => + this: TimerGraphStageLogic with GraphStageCompanion with StageLogging => override val startConnection = true @@ -54,8 +53,20 @@ private abstract class SourceStageLogic[T](shape: SourceShape[T], inheritedAttributes: Attributes) extends TimerGraphStageLogic(shape) with JmsConsumerConnector + with GraphStageCompanion with StageLogging { + final override def graphStageMaterializer: Materializer = materializer + + final override def graphStageDestination: Destination = destination + + final override def scheduleOnceOnGraphStage(timerKey: Any, delay: FiniteDuration): Unit = + scheduleOnce(timerKey, delay) + + final override def isTimerActiveOnGraphStage(timerKey: Any): Boolean = isTimerActive(timerKey) + + final override def cancelTimerOnGraphStage(timerKey: Any): Unit = cancelTimer(timerKey) + override protected def jmsSettings: JmsConsumerSettings = settings private val queue = mutable.Queue[T]() private val stopping = new AtomicBoolean(false) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1a1f851c9..110560467 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -102,6 +102,7 @@ object Dependencies { ExclusionRule("software.amazon.awssdk", "netty-nio-client"))) ++ Mockito) val AzureStorageQueue = Seq( + crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "com.microsoft.azure" % "azure-storage" % "8.0.0")) @@ -110,6 +111,7 @@ object Dependencies { val CassandraDriverVersionInDocs = "4.15" val Cassandra = Seq( + crossScalaVersions -= Scala3, libraryDependencies ++= Seq( ("com.datastax.oss" % "java-driver-core" % CassandraDriverVersion) .exclude("com.github.spotbugs", "spotbugs-annotations") @@ -118,6 +120,7 @@ object Dependencies { "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided)) val Couchbase = Seq( + crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "com.couchbase.client" % "java-client" % CouchbaseVersion, // ApacheV2 "io.reactivex" % "rxjava-reactive-streams" % "1.2.1", // ApacheV2 @@ -145,7 +148,6 @@ object Dependencies { "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion)) val Elasticsearch = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion, @@ -186,6 +188,7 @@ object Dependencies { "org.apache.logging.log4j" % "log4j-to-slf4j" % "2.17.1" % Test) ++ JacksonDatabindDependencies) val GoogleCommon = Seq( + crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion, @@ -195,6 +198,7 @@ object Dependencies { ) ++ Mockito) val GoogleBigQuery = Seq( + crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-jackson" % PekkoHttpVersion % Provided, @@ -220,6 +224,7 @@ object Dependencies { "org.apache.pekko" %% "pekko-discovery" % PekkoVersion) ++ Mockito) val GooglePubSub = Seq( + crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion, @@ -237,6 +242,7 @@ object Dependencies { "org.apache.pekko" %% "pekko-discovery" % PekkoVersion)) val GoogleFcm = Seq( + crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion) ++ Mockito) @@ -280,6 +286,7 @@ object Dependencies { "org.slf4j" % "log4j-over-slf4j" % log4jOverSlf4jVersion % Test)) val HuaweiPushKit = Seq( + crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion, @@ -298,7 +305,6 @@ object Dependencies { "org.mdedetrich" %% "pekko-http-circe" % "1.0.0")) val Jms = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "javax.jms" % "jms" % "1.1" % Provided, "com.ibm.mq" % "com.ibm.mq.allclient" % "9.2.5.0" % Test, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
