This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch scala3 in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
commit 5cda19e0e3c2f654c3e0a1ab8dd0ca639285132c Author: PJ Fanning <[email protected]> AuthorDate: Sun Jun 11 14:24:27 2023 +0100 Scala3 ftp support (#170) * support scala3 in ftp connector * Update BaseSpec.scala --- .../stream/connectors/ftp/impl/FtpBrowserGraphStage.scala | 6 +++--- .../ftp/impl/FtpDirectoryOperationsGraphStage.scala | 2 +- .../stream/connectors/ftp/impl/FtpGraphStageLogic.scala | 10 +++++----- .../pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala | 14 +++++++------- .../apache/pekko/stream/connectors/ftp/impl/FtpLike.scala | 13 ++++++++----- .../pekko/stream/connectors/ftp/impl/FtpOperations.scala | 2 +- .../pekko/stream/connectors/ftp/impl/FtpsOperations.scala | 2 +- .../pekko/stream/connectors/ftp/impl/SftpOperations.scala | 2 +- .../pekko/stream/connectors/ftp/javadsl/FtpApi.scala | 4 ++-- .../pekko/stream/connectors/ftp/scaladsl/FtpApi.scala | 4 ++-- project/Dependencies.scala | 1 - 11 files changed, 31 insertions(+), 29 deletions(-) diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpBrowserGraphStage.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpBrowserGraphStage.scala index 769156909..589624aa4 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpBrowserGraphStage.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpBrowserGraphStage.scala @@ -31,7 +31,7 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings] def emitTraversedDirectories: Boolean = false - def createLogic(inheritedAttributes: Attributes) = { + def createLogic(inheritedAttributes: Attributes): FtpGraphStageLogic[FtpFile, FtpClient, S] = { val logic = new FtpGraphStageLogic[FtpFile, FtpClient, S](shape, ftpLike, connectionSettings, ftpClient) { private[this] var buffer: Seq[FtpFile] = Seq.empty[FtpFile] @@ -84,9 +84,9 @@ private[ftp] trait FtpBrowserGraphStage[FtpClient, S <: RemoteFileSettings] private[this] def getFilesFromPath(basePath: String) = if (basePath.isEmpty) - ftpLike.listFiles(handler.get) + graphStageFtpLike.listFiles(handler.get) else - ftpLike.listFiles(basePath, handler.get) + graphStageFtpLike.listFiles(basePath, handler.get) } // end of stage logic diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpDirectoryOperationsGraphStage.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpDirectoryOperationsGraphStage.scala index c24b0d066..2e80bd58a 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpDirectoryOperationsGraphStage.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpDirectoryOperationsGraphStage.scala @@ -31,7 +31,7 @@ private[ftp] trait FtpDirectoryOperationsGraphStage[FtpClient, S <: RemoteFileSe out, new OutHandler { override def onPull(): Unit = { - push(out, ftpLike.mkdir(basePath, directoryName, handler.get)) + push(out, graphStageFtpLike.mkdir(basePath, directoryName, handler.get)) complete(out) } }) diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala index 0090c3504..d248dc0bf 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala @@ -29,18 +29,18 @@ import scala.util.control.NonFatal @InternalApi private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSettings]( val shape: Shape, - val ftpLike: FtpLike[FtpClient, S], + val graphStageFtpLike: FtpLike[FtpClient, S], val connectionSettings: S, val ftpClient: () => FtpClient) extends GraphStageLogic(shape) { - protected[this] implicit val client = ftpClient() - protected[this] var handler: Option[ftpLike.Handler] = Option.empty[ftpLike.Handler] + protected[this] implicit val client: FtpClient = ftpClient() + protected[this] var handler: Option[graphStageFtpLike.Handler] = Option.empty[graphStageFtpLike.Handler] protected[this] var failed = false override def preStart(): Unit = { super.preStart() try { - val tryConnect = ftpLike.connect(connectionSettings) + val tryConnect = graphStageFtpLike.connect(connectionSettings) if (tryConnect.isSuccess) { handler = tryConnect.toOption } else @@ -76,7 +76,7 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSett protected[this] def doPreStart(): Unit protected[this] def disconnect(): Unit = - handler.foreach(ftpLike.disconnect) + handler.foreach(graphStageFtpLike.disconnect) protected[this] def matSuccess(): Boolean diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala index 75414ade0..00cbc5e21 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala @@ -106,7 +106,7 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings] isOpt.foreach { os => try { os.close() - ftpLike match { + graphStageFtpLike match { case cfo: CommonFtpOperations => if (!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler])) throw new IOException("File transfer failed.") @@ -128,13 +128,13 @@ private[ftp] trait FtpIOSourceStage[FtpClient, S <: RemoteFileSettings] } protected[this] def doPreStart(): Unit = - isOpt = ftpLike match { + isOpt = graphStageFtpLike match { case ur: UnconfirmedReads => withUnconfirmedReads(ur) case ro: RetrieveOffset => Some(ro.retrieveFileInputStream(path, handler.get.asInstanceOf[ro.Handler], offset).get) case _ => - Some(ftpLike.retrieveFileInputStream(path, handler.get).get) + Some(graphStageFtpLike.retrieveFileInputStream(path, handler.get).get) } private def withUnconfirmedReads( @@ -229,7 +229,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings] osOpt.foreach { os => try { os.close() - ftpLike match { + graphStageFtpLike match { case cfo: CommonFtpOperations => if (!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler])) throw new IOException("File transfer failed.") @@ -251,7 +251,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: RemoteFileSettings] } protected[this] def doPreStart(): Unit = { - osOpt = Some(ftpLike.storeFileOutputStream(path, handler.get, append).get) + osOpt = Some(graphStageFtpLike.storeFileOutputStream(path, handler.get, append).get) pull(in) } @@ -301,7 +301,7 @@ private[ftp] trait FtpMoveSink[FtpClient, S <: RemoteFileSettings] override def onPush(): Unit = { try { val sourcePath = grab(in) - ftpLike.move(sourcePath.path, destinationPath(sourcePath), handler.get) + graphStageFtpLike.move(sourcePath.path, destinationPath(sourcePath), handler.get) numberOfMovedFiles = numberOfMovedFiles + 1 pull(in) } catch { @@ -356,7 +356,7 @@ private[ftp] trait FtpRemoveSink[FtpClient, S <: RemoteFileSettings] new InHandler { override def onPush(): Unit = { try { - ftpLike.remove(grab(in).path, handler.get) + graphStageFtpLike.remove(grab(in).path, handler.get) numberOfRemovedFiles = numberOfRemovedFiles + 1 pull(in) } catch { diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala index d64b488fe..216d7a597 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala @@ -54,7 +54,7 @@ protected[ftp] trait FtpLike[FtpClient, S <: RemoteFileSettings] { * INTERNAL API */ @InternalApi -protected[ftp] trait RetrieveOffset { _: FtpLike[_, _] => +protected[ftp] trait RetrieveOffset { self: FtpLike[_, _] => def retrieveFileInputStream(name: String, handler: Handler, offset: Long): Try[InputStream] @@ -64,7 +64,7 @@ protected[ftp] trait RetrieveOffset { _: FtpLike[_, _] => * INTERNAL API */ @InternalApi -protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] => +protected[ftp] trait UnconfirmedReads { self: FtpLike[_, _] => def retrieveFileInputStream(name: String, handler: Handler, offset: Long, maxUnconfirmedReads: Int): Try[InputStream] @@ -76,8 +76,11 @@ protected[ftp] trait UnconfirmedReads { _: FtpLike[_, _] => @InternalApi object FtpLike { // type class instances - implicit val ftpLikeInstance = new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations - implicit val ftpsLikeInstance = new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations - implicit val sFtpLikeInstance = + implicit val ftpLikeInstance: FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations = + new FtpLike[FTPClient, FtpSettings] with RetrieveOffset with FtpOperations + implicit val ftpsLikeInstance: FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations = + new FtpLike[FTPSClient, FtpsSettings] with RetrieveOffset with FtpsOperations + implicit val sFtpLikeInstance + : FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads = new FtpLike[SSHClient, SftpSettings] with RetrieveOffset with SftpOperations with UnconfirmedReads } diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala index e4c46e45d..de0990fd0 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala @@ -23,7 +23,7 @@ import scala.util.Try * INTERNAL API */ @InternalApi -private[ftp] trait FtpOperations extends CommonFtpOperations { _: FtpLike[FTPClient, FtpSettings] => +private[ftp] trait FtpOperations extends CommonFtpOperations { self: FtpLike[FTPClient, FtpSettings] => def connect(connectionSettings: FtpSettings)(implicit ftpClient: FTPClient): Try[Handler] = Try { connectionSettings.proxy.foreach(ftpClient.setProxy) diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala index 7c672f745..4fad15ca3 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpsOperations.scala @@ -25,7 +25,7 @@ import scala.util.Try */ @InternalApi private[ftp] trait FtpsOperations extends CommonFtpOperations { - _: FtpLike[FTPSClient, FtpsSettings] => + self: FtpLike[FTPSClient, FtpsSettings] => def connect(connectionSettings: FtpsSettings)(implicit ftpClient: FTPSClient): Try[Handler] = Try { diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala index 59045f385..04cbd77bd 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala @@ -37,7 +37,7 @@ import scala.util.{ Failure, Try } * INTERNAL API */ @InternalApi -private[ftp] trait SftpOperations { _: FtpLike[SSHClient, SftpSettings] => +private[ftp] trait SftpOperations { self: FtpLike[SSHClient, SftpSettings] => type Handler = SFTPClient diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/javadsl/FtpApi.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/javadsl/FtpApi.scala index 9d6c299b1..3644a1f4d 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/javadsl/FtpApi.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/javadsl/FtpApi.scala @@ -30,7 +30,7 @@ import net.schmizz.sshj.SSHClient import org.apache.commons.net.ftp.{ FTPClient, FTPSClient } @DoNotInherit -sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[FtpClient, S] => +sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { self: FtpSourceFactory[FtpClient, S] => /** * Java API: creates a [[pekko.stream.javadsl.Source Source]] of [[FtpFile]]s from the remote user `root` directory. @@ -573,6 +573,6 @@ object Sftp extends SftpApi { */ def create(customSshClient: SSHClient): SftpApi = new SftpApi { - override val sshClient: SSHClient = customSshClient + override def sshClient(): SSHClient = customSshClient } } diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/scaladsl/FtpApi.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/scaladsl/FtpApi.scala index 4bc7da6e4..0e574bffe 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/scaladsl/FtpApi.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/scaladsl/FtpApi.scala @@ -28,7 +28,7 @@ import org.apache.commons.net.ftp.{ FTPClient, FTPSClient } import scala.concurrent.Future @DoNotInherit -sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { _: FtpSourceFactory[FtpClient, S] => +sealed trait FtpApi[FtpClient, S <: RemoteFileSettings] { self: FtpSourceFactory[FtpClient, S] => /** * Scala API: creates a [[pekko.stream.scaladsl.Source Source]] of [[FtpFile]]s from the remote user `root` directory. @@ -398,6 +398,6 @@ object Sftp extends SftpApi { */ def apply(customSshClient: SSHClient): SftpApi = new SftpApi { - override val sshClient: SSHClient = customSshClient + override def sshClient(): SSHClient = customSshClient } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ca8286650..a2af1249d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -169,7 +169,6 @@ object Dependencies { )) val Ftp = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "commons-net" % "commons-net" % "3.8.0", "com.hierynomus" % "sshj" % "0.33.0")) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
