This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch scala3
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git

commit 5cda19e0e3c2f654c3e0a1ab8dd0ca639285132c
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jun 11 14:24:27 2023 +0100

    Scala3 ftp support (#170)
    
    * support scala3 in ftp connector
    
    * Update BaseSpec.scala
---
 .../stream/connectors/ftp/impl/FtpBrowserGraphStage.scala  |  6 +++---
 .../ftp/impl/FtpDirectoryOperationsGraphStage.scala        |  2 +-
 .../stream/connectors/ftp/impl/FtpGraphStageLogic.scala    | 10 +++++-----
 .../pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala | 14 +++++++-------
 .../apache/pekko/stream/connectors/ftp/impl/FtpLike.scala  | 13 ++++++++-----
 .../pekko/stream/connectors/ftp/impl/FtpOperations.scala   |  2 +-
 .../pekko/stream/connectors/ftp/impl/FtpsOperations.scala  |  2 +-
 .../pekko/stream/connectors/ftp/impl/SftpOperations.scala  |  2 +-
 .../pekko/stream/connectors/ftp/javadsl/FtpApi.scala       |  4 ++--
 .../pekko/stream/connectors/ftp/scaladsl/FtpApi.scala      |  4 ++--
 project/Dependencies.scala                                 |  1 -
 11 files changed, 31 insertions(+), 29 deletions(-)

diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpBrowserGraphStage.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpBrowserGraphStage.scala
index 769156909..589624aa4 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpBrowserGraphStage.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpBrowserGraphStage.scala
@@ -31,7 +31,7 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: 
RemoteFileSettings]
 
   def emitTraversedDirectories: Boolean = false
 
-  def createLogic(inheritedAttributes: Attributes) = {
+  def createLogic(inheritedAttributes: Attributes): 
FtpGraphStageLogic[FtpFile, FtpClient, S] = {
     val logic = new FtpGraphStageLogic[FtpFile, FtpClient, S](shape, ftpLike, 
connectionSettings, ftpClient) {
 
       private[this] var buffer: Seq[FtpFile] = Seq.empty[FtpFile]
@@ -84,9 +84,9 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: 
RemoteFileSettings]
 
       private[this] def getFilesFromPath(basePath: String) =
         if (basePath.isEmpty)
-          ftpLike.listFiles(handler.get)
+          graphStageFtpLike.listFiles(handler.get)
         else
-          ftpLike.listFiles(basePath, handler.get)
+          graphStageFtpLike.listFiles(basePath, handler.get)
 
     } // end of stage logic
 
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpDirectoryOperationsGraphStage.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpDirectoryOperationsGraphStage.scala
index c24b0d066..2e80bd58a 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpDirectoryOperationsGraphStage.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpDirectoryOperationsGraphStage.scala
@@ -31,7 +31,7 @@ private[ftp] trait 
FtpDirectoryOperationsGraphStage[FtpClient, S <: RemoteFileSe
         out,
         new OutHandler {
           override def onPull(): Unit = {
-            push(out, ftpLike.mkdir(basePath, directoryName, handler.get))
+            push(out, graphStageFtpLike.mkdir(basePath, directoryName, 
handler.get))
             complete(out)
           }
         })
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
index 0090c3504..d248dc0bf 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
@@ -29,18 +29,18 @@ import scala.util.control.NonFatal
 @InternalApi
 private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: 
RemoteFileSettings](
     val shape: Shape,
-    val ftpLike: FtpLike[FtpClient, S],
+    val graphStageFtpLike: FtpLike[FtpClient, S],
     val connectionSettings: S,
     val ftpClient: () => FtpClient) extends GraphStageLogic(shape) {
 
-  protected[this] implicit val client = ftpClient()
-  protected[this] var handler: Option[ftpLike.Handler] = 
Option.empty[ftpLike.Handler]
+  protected[this] implicit val client: FtpClient = ftpClient()
+  protected[this] var handler: Option[graphStageFtpLike.Handler] = 
Option.empty[graphStageFtpLike.Handler]
   protected[this] var failed = false
 
   override def preStart(): Unit = {
     super.preStart()
     try {
-      val tryConnect = ftpLike.connect(connectionSettings)
+      val tryConnect = graphStageFtpLike.connect(connectionSettings)
       if (tryConnect.isSuccess) {
         handler = tryConnect.toOption
       } else
@@ -76,7 +76,7 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, 
S <: RemoteFileSett
   protected[this] def doPreStart(): Unit
 
   protected[this] def disconnect(): Unit =
-    handler.foreach(ftpLike.disconnect)
+    handler.foreach(graphStageFtpLike.disconnect)
 
   protected[this] def matSuccess(): Boolean
 
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
index 75414ade0..00cbc5e21 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
@@ -106,7 +106,7 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: 
RemoteFileSettings]
           isOpt.foreach { os =>
             try {
               os.close()
-              ftpLike match {
+              graphStageFtpLike match {
                 case cfo: CommonFtpOperations =>
                   if 
(!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler]))
                     throw new IOException("File transfer failed.")
@@ -128,13 +128,13 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: 
RemoteFileSettings]
         }
 
       protected[this] def doPreStart(): Unit =
-        isOpt = ftpLike match {
+        isOpt = graphStageFtpLike match {
           case ur: UnconfirmedReads =>
             withUnconfirmedReads(ur)
           case ro: RetrieveOffset =>
             Some(ro.retrieveFileInputStream(path, 
handler.get.asInstanceOf[ro.Handler], offset).get)
           case _ =>
-            Some(ftpLike.retrieveFileInputStream(path, handler.get).get)
+            Some(graphStageFtpLike.retrieveFileInputStream(path, 
handler.get).get)
         }
 
       private def withUnconfirmedReads(
@@ -229,7 +229,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: 
RemoteFileSettings]
           osOpt.foreach { os =>
             try {
               os.close()
-              ftpLike match {
+              graphStageFtpLike match {
                 case cfo: CommonFtpOperations =>
                   if 
(!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler]))
                     throw new IOException("File transfer failed.")
@@ -251,7 +251,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: 
RemoteFileSettings]
         }
 
       protected[this] def doPreStart(): Unit = {
-        osOpt = Some(ftpLike.storeFileOutputStream(path, handler.get, 
append).get)
+        osOpt = Some(graphStageFtpLike.storeFileOutputStream(path, 
handler.get, append).get)
         pull(in)
       }
 
@@ -301,7 +301,7 @@ private[ftp] trait FtpMoveSink[FtpClient, S <: 
RemoteFileSettings]
             override def onPush(): Unit = {
               try {
                 val sourcePath = grab(in)
-                ftpLike.move(sourcePath.path, destinationPath(sourcePath), 
handler.get)
+                graphStageFtpLike.move(sourcePath.path, 
destinationPath(sourcePath), handler.get)
                 numberOfMovedFiles = numberOfMovedFiles + 1
                 pull(in)
               } catch {
@@ -356,7 +356,7 @@ private[ftp] trait FtpRemoveSink[FtpClient, S <: 
RemoteFileSettings]
           new InHandler {
             override def onPush(): Unit = {
               try {
-                ftpLike.remove(grab(in).path, handler.get)
+                graphStageFtpLike.remove(grab(in).path, handler.get)
                 numberOfRemovedFiles = numberOfRemovedFiles + 1
                 pull(in)
               } catch {
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
index d64b488fe..216d7a597 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
@@ -54,7 +54,7 @@ protected[ftp] trait FtpLike[FtpClient, S <: 
RemoteFileSettings] {
  * INTERNAL API
  */
 @InternalApi
-protected[ftp] trait RetrieveOffset { _: FtpLike[_, _] =>
+protected[ftp] trait RetrieveOffset { self: FtpLike[_, _] =>
 
   def retrieveFileInputStream(name: String, handler: Handler, offset: Long): 
Try[InputStream]
 
@@ -64,7 +64,7 @@ protected[ftp] trait RetrieveOffset { _: FtpLike[_, _] =>
  * INTERNAL API
  */
 @InternalApi
-protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] =>
+protected[ftp] trait UnconfirmedReads { self: FtpLike[_, _] =>
 
   def retrieveFileInputStream(name: String, handler: Handler, offset: Long, 
maxUnconfirmedReads: Int): Try[InputStream]
 
@@ -76,8 +76,11 @@ protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] =>
 @InternalApi
 object FtpLike {
   // type class instances
-  implicit val ftpLikeInstance = new FtpLike[FTPClient, FtpSettings] with 
RetrieveOffset with FtpOperations
-  implicit val ftpsLikeInstance = new FtpLike[FTPSClient, FtpsSettings] with 
RetrieveOffset with FtpsOperations
-  implicit val sFtpLikeInstance =
+  implicit val ftpLikeInstance: FtpLike[FTPClient, FtpSettings] with 
RetrieveOffset with FtpOperations =
+    new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations
+  implicit val ftpsLikeInstance: FtpLike[FTPSClient, FtpsSettings] with 
RetrieveOffset with FtpsOperations =
+    new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with 
FtpsOperations
+  implicit val sFtpLikeInstance
+      : FtpLike[SSHClient, SftpSettings] with RetrieveOffset with 
SftpOperations with UnconfirmedReads =
     new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with 
SftpOperations with UnconfirmedReads
 }
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala
index e4c46e45d..de0990fd0 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala
@@ -23,7 +23,7 @@ import scala.util.Try
  * INTERNAL API
  */
 @InternalApi
-private[ftp] trait FtpOperations extends CommonFtpOperations { _: 
FtpLike[FTPClient, FtpSettings] =>
+private[ftp] trait FtpOperations extends CommonFtpOperations { self: 
FtpLike[FTPClient, FtpSettings] =>
 
   def connect(connectionSettings: FtpSettings)(implicit ftpClient: FTPClient): 
Try[Handler] = Try {
     connectionSettings.proxy.foreach(ftpClient.setProxy)
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
index 7c672f745..4fad15ca3 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala
@@ -25,7 +25,7 @@ import scala.util.Try
  */
 @InternalApi
 private[ftp] trait FtpsOperations extends CommonFtpOperations {
-  _: FtpLike[FTPSClient, FtpsSettings] =>
+  self: FtpLike[FTPSClient, FtpsSettings] =>
 
   def connect(connectionSettings: FtpsSettings)(implicit ftpClient: 
FTPSClient): Try[Handler] =
     Try {
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
index 59045f385..04cbd77bd 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
@@ -37,7 +37,7 @@ import scala.util.{ Failure, Try }
  * INTERNAL API
  */
 @InternalApi
-private[ftp] trait SftpOperations { _: FtpLike[SSHClient, SftpSettings] =>
+private[ftp] trait SftpOperations { self: FtpLike[SSHClient, SftpSettings] =>
 
   type Handler = SFTPClient
 
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/javadsl/FtpApi.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/javadsl/FtpApi.scala
index 9d6c299b1..3644a1f4d 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/javadsl/FtpApi.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/javadsl/FtpApi.scala
@@ -30,7 +30,7 @@ import net.schmizz.sshj.SSHClient
 import org.apache.commons.net.ftp.{ FTPClient, FTPSClient }
 
 @DoNotInherit
-sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: 
FtpSourceFactory[FtpClient, S] =>
+sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { self: 
FtpSourceFactory[FtpClient, S] =>
 
   /**
    * Java API: creates a [[pekko.stream.javadsl.Source Source]] of 
[[FtpFile]]s from the remote user `root` directory.
@@ -573,6 +573,6 @@ object Sftp extends SftpApi {
    */
   def create(customSshClient: SSHClient): SftpApi =
     new SftpApi {
-      override val sshClient: SSHClient = customSshClient
+      override def sshClient(): SSHClient = customSshClient
     }
 }
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/scaladsl/FtpApi.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/scaladsl/FtpApi.scala
index 4bc7da6e4..0e574bffe 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/scaladsl/FtpApi.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/scaladsl/FtpApi.scala
@@ -28,7 +28,7 @@ import org.apache.commons.net.ftp.{ FTPClient, FTPSClient }
 import scala.concurrent.Future
 
 @DoNotInherit
-sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: 
FtpSourceFactory[FtpClient, S] =>
+sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { self: 
FtpSourceFactory[FtpClient, S] =>
 
   /**
    * Scala API: creates a [[pekko.stream.scaladsl.Source Source]] of 
[[FtpFile]]s from the remote user `root` directory.
@@ -398,6 +398,6 @@ object Sftp extends SftpApi {
    */
   def apply(customSshClient: SSHClient): SftpApi =
     new SftpApi {
-      override val sshClient: SSHClient = customSshClient
+      override def sshClient(): SSHClient = customSshClient
     }
 }
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index ca8286650..a2af1249d 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -169,7 +169,6 @@ object Dependencies {
     ))
 
   val Ftp = Seq(
-    crossScalaVersions -= Scala3,
     libraryDependencies ++= Seq(
       "commons-net" % "commons-net" % "3.8.0",
       "com.hierynomus" % "sshj" % "0.33.0"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to