This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch scala3 in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
commit f6990e95d9fbefb0f9aa59c4d9ec80199cf61f6b Author: PJ Fanning <[email protected]> AuthorDate: Fri Jun 9 12:48:11 2023 +0100 scala 3 support for json streaming (#146) * scala 3 support for json streaming Update JsonStreamReader.scala make buffer a val Update JsonStreamReader.scala Update JsonStreamReader.scala use Iterable.single Delete MyByteString.scala compile issue in scala 2.12 Update JsonStreamReader.scala * Update Dependencies.scala * use separate scala2/3 code * Update QueueHelper.scala --- build.sbt | 11 ++++++++++- .../stream/connectors/json/impl/QueueHelper.scala | 19 +++++++++++++++++++ .../stream/connectors/json/impl/QueueHelper.scala | 21 +++++++++++++++++++++ .../connectors/json/impl/JsonStreamReader.scala | 6 ++++-- project/Dependencies.scala | 1 - 5 files changed, 54 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index a6b815c93..deeaf3022 100644 --- a/build.sbt +++ b/build.sbt @@ -277,7 +277,16 @@ lazy val ironmq = pekkoConnectorProject( lazy val jms = pekkoConnectorProject("jms", "jms", Dependencies.Jms) -lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming", Dependencies.JsonStreaming) +val scalaReleaseSeparateSource: Def.SettingsDefinition = Compile / unmanagedSourceDirectories ++= { + if (scalaVersion.value.startsWith("2")) { + Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-2") + } else { + Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-3") + } +} + +lazy val jsonStreaming = pekkoConnectorProject("json-streaming", "json.streaming", + Dependencies.JsonStreaming ++ scalaReleaseSeparateSource) lazy val kinesis = pekkoConnectorProject("kinesis", "aws.kinesis", Dependencies.Kinesis) diff --git a/json-streaming/src/main/scala-2/org/apache/pekko/stream/connectors/json/impl/QueueHelper.scala b/json-streaming/src/main/scala-2/org/apache/pekko/stream/connectors/json/impl/QueueHelper.scala new file mode 100644 index 000000000..5972a1eca --- /dev/null +++ b/json-streaming/src/main/scala-2/org/apache/pekko/stream/connectors/json/impl/QueueHelper.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, derived from Akka. + */ + +package org.apache.pekko.stream.connectors.json.impl + +import org.apache.pekko.util.ByteString + +import scala.collection.immutable.Queue + +private[impl] object QueueHelper { + @inline final def enqueue(queue: Queue[ByteString], byteString: ByteString): Queue[ByteString] = + queue.enqueue(byteString) +} diff --git a/json-streaming/src/main/scala-3/org/apache/pekko/stream/connectors/json/impl/QueueHelper.scala b/json-streaming/src/main/scala-3/org/apache/pekko/stream/connectors/json/impl/QueueHelper.scala new file mode 100644 index 000000000..27b1f7f5e --- /dev/null +++ b/json-streaming/src/main/scala-3/org/apache/pekko/stream/connectors/json/impl/QueueHelper.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, derived from Akka. + */ + +package org.apache.pekko.stream.connectors.json.impl + +import org.apache.pekko.util.ByteString + +import scala.collection.immutable.Queue + +private[impl] object QueueHelper { + inline final def enqueue(queue: Queue[ByteString], byteString: ByteString): Queue[ByteString] = { + // see https://github.com/lampepfl/dotty/issues/17946 + queue.enqueueAll(Iterable.single(byteString)) + } +} diff --git a/json-streaming/src/main/scala/org/apache/pekko/stream/connectors/json/impl/JsonStreamReader.scala b/json-streaming/src/main/scala/org/apache/pekko/stream/connectors/json/impl/JsonStreamReader.scala index d93310d80..990b68729 100644 --- a/json-streaming/src/main/scala/org/apache/pekko/stream/connectors/json/impl/JsonStreamReader.scala +++ b/json-streaming/src/main/scala/org/apache/pekko/stream/connectors/json/impl/JsonStreamReader.scala @@ -48,8 +48,10 @@ private[pekko] final class JsonStreamReader(path: JsonPath) extends GraphStage[F private val config = surfer.configBuilder .bind(path, new JsonPathListener { - override def onValue(value: Any, context: ParsingContext): Unit = - buffer = buffer.enqueue(ByteString(value.toString)) + override def onValue(value: Any, context: ParsingContext): Unit = { + // see https://github.com/lampepfl/dotty/issues/17946 + buffer = QueueHelper.enqueue(buffer, ByteString(value.toString)) + } }) .build private val parser = surfer.createNonBlockingParser(config) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6158ce97b..1a1f851c9 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -310,7 +310,6 @@ object Dependencies { "https://repository.jboss.org/nexus/content/groups/public")) +: externalResolvers.value) val JsonStreaming = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "com.github.jsurfer" % "jsurfer-jackson" % "1.6.0") ++ JacksonDatabindDependencies) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
