This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch scala3
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git

commit 703553888c71957273c6bb29894eba2087a9ed72
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Jun 12 13:56:25 2023 +0100

    support scala3 in avroparquet (#158)
    
    * support scala3 in avroparquet
    
    add some scala3 tests
    
    Update AvroParquetSinkSpec.scala
    
    Update avroparquet.md
    
    Update AbstractAvroParquet.scala
    
    Update AbstractAvroParquet.scala
    
    * refactor test code
    
    * Update AbstractAvroParquetBase.scala
    
    * fix doc links
    
    * use Common.isScala3
    
    * Update Dependencies.scala
    
    * Update build.sbt
---
 .../docs/scaladsl/AbstractAvroParquet.scala        | 33 +++++++++++++++++
 .../docs/scaladsl/AbstractAvroParquet.scala        | 41 ++++++++++++++++++++++
 ...Parquet.scala => AbstractAvroParquetBase.scala} | 26 ++++----------
 .../scala/docs/scaladsl/AvroParquetFlowSpec.scala  | 10 +++---
 .../scala/docs/scaladsl/AvroParquetSinkSpec.scala  | 12 +++----
 .../docs/scaladsl/AvroParquetSourceSpec.scala      |  8 ++---
 build.sbt                                          | 11 +-----
 docs/src/main/paradox/avroparquet.md               |  4 +--
 project/Common.scala                               |  2 +-
 project/Dependencies.scala                         | 15 ++++----
 10 files changed, 108 insertions(+), 54 deletions(-)

diff --git 
a/avroparquet/src/test/scala-2/docs/scaladsl/AbstractAvroParquet.scala 
b/avroparquet/src/test/scala-2/docs/scaladsl/AbstractAvroParquet.scala
new file mode 100644
index 000000000..3d2da02e0
--- /dev/null
+++ b/avroparquet/src/test/scala-2/docs/scaladsl/AbstractAvroParquet.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.scaladsl
+
+import com.sksamuel.avro4s.RecordFormat
+import org.apache.pekko.testkit.TestKit
+import org.scalatest.{ BeforeAndAfterAll, Suite }
+
+import java.io.File
+import scala.reflect.io.Directory
+
+trait AbstractAvroParquet extends BeforeAndAfterAll with 
AbstractAvroParquetBase {
+  this: Suite with TestKit =>
+
+  val format: RecordFormat[Document] = RecordFormat[Document]
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+    val directory = new Directory(new File(folder))
+    directory.deleteRecursively()
+  }
+}
diff --git 
a/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala 
b/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
new file mode 100644
index 000000000..bd91822d6
--- /dev/null
+++ b/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.scaladsl
+
+import com.sksamuel.avro4s._
+import org.apache.pekko.testkit.TestKit
+import org.scalatest.{ BeforeAndAfterAll, Suite }
+
+import java.io.File
+
+trait AbstractAvroParquet extends BeforeAndAfterAll with 
AbstractAvroParquetBase {
+  this: Suite with TestKit =>
+
+  implicit val toRecordDocument: ToRecord[Document] = 
ToRecord[Document](schema)
+  implicit val fromRecordDocument: FromRecord[Document] = 
FromRecord[Document](schema)
+  val format: RecordFormat[Document] = RecordFormat[Document](schema)
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+    deleteRecursively(new File(folder))
+  }
+
+  private def deleteRecursively(f: File): Boolean = {
+    if (f.isDirectory) f.listFiles match {
+      case null =>
+      case xs   => xs.foreach(deleteRecursively)
+    }
+    f.delete()
+  }
+}
diff --git a/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala 
b/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala
similarity index 88%
rename from avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala
rename to avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala
index 1369544d0..97a4052f8 100644
--- a/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala
+++ b/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala
@@ -13,26 +13,18 @@
 
 package docs.scaladsl
 
-import java.io.File
-
-import org.apache.pekko.testkit.TestKit
-import com.sksamuel.avro4s.RecordFormat
 import org.apache.avro.Schema
 import org.apache.avro.generic.{ GenericRecord, GenericRecordBuilder }
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.avro.{ AvroParquetReader, AvroParquetWriter, 
AvroReadSupport }
-import org.apache.parquet.hadoop.{ ParquetReader, ParquetWriter }
 import org.apache.parquet.hadoop.util.HadoopInputFile
+import org.apache.parquet.hadoop.{ ParquetReader, ParquetWriter }
 import org.scalacheck.Gen
-import org.scalatest.{ BeforeAndAfterAll, Suite }
 
-import scala.reflect.io.Directory
 import scala.util.Random
 
-trait AbstractAvroParquet extends BeforeAndAfterAll {
-  this: Suite with TestKit =>
-
+trait AbstractAvroParquetBase {
   case class Document(id: String, body: String)
 
   val schema: Schema = new Schema.Parser().parse(
@@ -42,13 +34,13 @@ trait AbstractAvroParquet extends BeforeAndAfterAll {
     Gen.oneOf(Seq(Document(id = Gen.alphaStr.sample.get, body = 
Gen.alphaLowerStr.sample.get)))
   val genDocuments: Int => Gen[List[Document]] = n => Gen.listOfN(n, 
genDocument)
 
-  val format: RecordFormat[Document] = RecordFormat[Document]
-
   val folder: String = "./" + Random.alphanumeric.take(8).mkString("")
 
   val genFinalFile: Gen[String] = for {
     fileName <- Gen.alphaLowerStr
-  } yield { folder + "/" + fileName + ".parquet" }
+  } yield {
+    folder + "/" + fileName + ".parquet"
+  }
 
   val genFile: Gen[String] = Gen.oneOf(Seq(Gen.alphaLowerStr.sample.get + 
".parquet"))
 
@@ -110,8 +102,8 @@ trait AbstractAvroParquet extends BeforeAndAfterAll {
     import org.apache.avro.generic.GenericRecord
     import org.apache.hadoop.fs.Path
     import org.apache.parquet.avro.AvroParquetReader
-    import org.apache.parquet.hadoop.util.HadoopInputFile
     import org.apache.parquet.hadoop.ParquetReader
+    import org.apache.parquet.hadoop.util.HadoopInputFile
 
     val file: String = "./sample/path/test.parquet"
     val writer: ParquetWriter[GenericRecord] =
@@ -124,10 +116,4 @@ trait AbstractAvroParquet extends BeforeAndAfterAll {
     if (writer != null && reader != null) { // forces val usage
     }
   }
-
-  override def afterAll(): Unit = {
-    TestKit.shutdownActorSystem(system)
-    val directory = new Directory(new File(folder))
-    directory.deleteRecursively()
-  }
 }
diff --git a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala 
b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala
index 626b15400..c52de6acc 100644
--- a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala
+++ b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetFlowSpec.scala
@@ -13,6 +13,9 @@
 
 package docs.scaladsl
 
+import com.sksamuel.avro4s.Record
+import org.apache.avro.generic.GenericRecord
+import org.apache.parquet.hadoop.ParquetWriter
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.actor.ActorSystem
@@ -20,13 +23,10 @@ import 
pekko.stream.connectors.avroparquet.scaladsl.AvroParquetFlow
 import pekko.stream.scaladsl.{ Flow, Sink, Source }
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.testkit.TestKit
-import com.sksamuel.avro4s.Record
-import org.apache.avro.generic.GenericRecord
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.wordspec.AnyWordSpecLike
-import org.apache.parquet.hadoop.ParquetWriter
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
 
 class AvroParquetFlowSpec
     extends TestKit(ActorSystem("FlowSpec"))
diff --git a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSinkSpec.scala 
b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSinkSpec.scala
index b92170a37..18b414e94 100644
--- a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSinkSpec.scala
+++ b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSinkSpec.scala
@@ -13,18 +13,18 @@
 
 package docs.scaladsl
 
+import com.sksamuel.avro4s.Record
+import org.apache.avro.generic.GenericRecord
+import org.apache.parquet.hadoop.ParquetWriter
 import org.apache.pekko
-import pekko.{ Done, NotUsed }
 import pekko.actor.ActorSystem
 import pekko.stream.connectors.avroparquet.scaladsl.AvroParquetSink
 import pekko.stream.scaladsl.Source
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.testkit.TestKit
-import com.sksamuel.avro4s.{ Record, RecordFormat }
-import org.scalatest.concurrent.ScalaFutures
-import org.apache.avro.generic.GenericRecord
-import org.apache.parquet.hadoop.ParquetWriter
+import pekko.{ Done, NotUsed }
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpecLike
 
@@ -63,7 +63,7 @@ class AvroParquetSinkSpec
       val documents: List[Document] = genDocuments(n).sample.get
       val writer: ParquetWriter[Record] = parquetWriter[Record](file, conf, 
schema)
       // #init-sink
-      val records: List[Record] = documents.map(RecordFormat[Document].to(_))
+      val records: List[Record] = documents.map(format.to(_))
       val source: Source[Record, NotUsed] = Source(records)
       val result: Future[Done] = source
         .runWith(AvroParquetSink(writer))
diff --git 
a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala 
b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
index ea2300d6b..2fa88db1e 100644
--- a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
+++ b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
@@ -13,6 +13,9 @@
 
 package docs.scaladsl
 
+import com.sksamuel.avro4s.Record
+import org.apache.avro.generic.GenericRecord
+import org.apache.parquet.hadoop.ParquetReader
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.actor.ActorSystem
@@ -21,11 +24,8 @@ import pekko.stream.scaladsl.{ Keep, Source }
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.TestKit
-import com.sksamuel.avro4s.Record
-import org.scalatest.concurrent.ScalaFutures
-import org.apache.avro.generic.GenericRecord
-import org.apache.parquet.hadoop.ParquetReader
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpecLike
 
diff --git a/build.sbt b/build.sbt
index deeaf3022..a6b815c93 100644
--- a/build.sbt
+++ b/build.sbt
@@ -277,16 +277,7 @@ lazy val ironmq = pekkoConnectorProject(
 
 lazy val jms = pekkoConnectorProject("jms", "jms", Dependencies.Jms)
 
-val scalaReleaseSeparateSource: Def.SettingsDefinition = Compile / 
unmanagedSourceDirectories ++= {
-  if (scalaVersion.value.startsWith("2")) {
-    Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-2")
-  } else {
-    Seq((LocalRootProject / baseDirectory).value / "src" / "main" / "scala-3")
-  }
-}
-
-lazy val jsonStreaming = pekkoConnectorProject("json-streaming", 
"json.streaming",
-  Dependencies.JsonStreaming ++ scalaReleaseSeparateSource)
+lazy val jsonStreaming = pekkoConnectorProject("json-streaming", 
"json.streaming", Dependencies.JsonStreaming)
 
 lazy val kinesis = pekkoConnectorProject("kinesis", "aws.kinesis", 
Dependencies.Kinesis)
 
diff --git a/docs/src/main/paradox/avroparquet.md 
b/docs/src/main/paradox/avroparquet.md
index 270aad232..752eca4ee 100644
--- a/docs/src/main/paradox/avroparquet.md
+++ b/docs/src/main/paradox/avroparquet.md
@@ -29,7 +29,7 @@ Sometimes it might be useful to use a Parquet file as stream 
Source. For this we
 instance which will produce records as subtypes of `GenericRecord`, the Avro 
record's abstract representation.
  
 Scala
-: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala) 
{ #prepare-source #init-reader }
+: @@snip 
(/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala) { 
#prepare-source #init-reader }
 
 Java
 : @@snip (/avroparquet/src/test/java/docs/javadsl/Examples.java) { 
#init-reader }
@@ -49,7 +49,7 @@ On the other hand, you can use `AvroParquetWriter` as the 
Apache Pekko Streams S
 In that case, its initialisation would require an instance of 
`org.apache.parquet.hadoop.ParquetWriter`. It will also expect any subtype of 
`GenericRecord` to be passed.
  
 Scala
-: @@snip (/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquet.scala) 
{ #prepare-sink }
+: @@snip 
(/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala) { 
#prepare-sink }
 
 Java
 : @@snip (/avroparquet/src/test/java/docs/javadsl/AvroParquetSinkTest.java) { 
#init-writer }
diff --git a/project/Common.scala b/project/Common.scala
index 2f020dff0..adaa1f49b 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -51,7 +51,7 @@ object Common extends AutoPlugin {
     "com.google.api:com.google.cloud:com.google.iam:com.google.logging:" +
     "com.google.longrunning:com.google.protobuf:com.google.rpc:com.google.type"
 
-  override lazy val projectSettings = Dependencies.Common ++ Seq(
+  override lazy val projectSettings = Dependencies.CommonSettings ++ Seq(
     projectInfoVersion := (if (isSnapshot.value) "snapshot" else 
version.value),
     crossVersion := CrossVersion.binary,
     crossScalaVersions := Dependencies.ScalaVersions,
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 88c3dab13..0708a1c6f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -56,7 +56,7 @@ object Dependencies {
   val log4jOverSlf4jVersion = "1.7.36"
   val jclOverSlf4jVersion = "1.7.36"
 
-  val Common = Seq(
+  val CommonSettings = Seq(
     // These libraries are added to all modules via the `Common` AutoPlugin
     libraryDependencies ++= Seq(
       "org.apache.pekko" %% "pekko-stream" % PekkoVersion))
@@ -154,13 +154,16 @@ object Dependencies {
     libraryDependencies ++= Seq(
       "com.google.jimfs" % "jimfs" % "1.2" % Test))
 
+  val avro4sVersion: Def.Initialize[String] = Def.setting {
+    if (Common.isScala3.value) "5.0.4" else "4.1.1"
+  }
+
   val AvroParquet = Seq(
-    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
-      "org.apache.parquet" % "parquet-avro" % "1.10.1",
-      ("org.apache.hadoop" % "hadoop-client" % "3.2.1" % 
Test).exclude("log4j", "log4j"),
-      ("org.apache.hadoop" % "hadoop-common" % "3.2.1" % 
Test).exclude("log4j", "log4j"),
-      "com.sksamuel.avro4s" %% "avro4s-core" % "4.1.1" % Test,
+      "org.apache.parquet" % "parquet-avro" % "1.10.1", // Apache2
+      ("org.apache.hadoop" % "hadoop-client" % "3.2.1" % 
Test).exclude("log4j", "log4j"), // Apache2
+      ("org.apache.hadoop" % "hadoop-common" % "3.2.1" % 
Test).exclude("log4j", "log4j"), // Apache2
+      "com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion.value % Test,
       "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
       "org.specs2" %% "specs2-core" % "4.20.0" % Test, // MIT like: 
https://github.com/etorreborre/specs2/blob/master/LICENSE.txt
       "org.slf4j" % "log4j-over-slf4j" % log4jOverSlf4jVersion % Test // MIT 
like: http://www.slf4j.org/license.html


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to