This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 1af4f80ec support ftps implicit mode (#311)
1af4f80ec is described below

commit 1af4f80ec3f61584044408003d7d1096f62729d6
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jan 28 17:06:20 2024 +0100

    support ftps implicit mode (#311)
    
    * support ftps implicit mode
    
    * Update model.scala
    
    * refactor
    
    * Delete reference.conf
    
    * Create ftps-implicit.backwards.excludes
    
    * move file to right place
    
    * add missing commas in settings toString
    
    * Update model.scala
---
 .../ftps-implicit.backwards.excludes               | 25 ++++++++++++++++++++++
 .../stream/connectors/ftp/impl/FtpGraphStage.scala |  2 +-
 .../connectors/ftp/impl/FtpGraphStageLogic.scala   |  4 ++--
 .../connectors/ftp/impl/FtpIOGraphStage.scala      |  6 +++---
 .../connectors/ftp/impl/FtpSourceFactory.scala     | 20 ++++++++---------
 .../apache/pekko/stream/connectors/ftp/model.scala | 15 +++++++++----
 6 files changed, 52 insertions(+), 20 deletions(-)

diff --git 
a/ftp/src/main/mima-filters/1.1.x.backwards.excludes/ftps-implicit.backwards.excludes
 
b/ftp/src/main/mima-filters/1.1.x.backwards.excludes/ftps-implicit.backwards.excludes
new file mode 100644
index 000000000..f8373c0ba
--- /dev/null
+++ 
b/ftp/src/main/mima-filters/1.1.x.backwards.excludes/ftps-implicit.backwards.excludes
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Changed signature of private FTP code to optionally allow implicit SSL/TLS 
in FTPSClient
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.ftp.FtpsSettings.this")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.javadsl.Ftp.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.javadsl.Ftps.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.javadsl.SftpApi.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.scaladsl.Ftp.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.scaladsl.Ftps.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.scaladsl.SftpApi.ftpClient")
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStage.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStage.scala
index 952c2a8de..b93ae08de 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStage.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStage.scala
@@ -26,7 +26,7 @@ trait FtpGraphStage[FtpClient, S <: RemoteFileSettings, T] 
extends GraphStage[So
 
   def connectionSettings: S
 
-  def ftpClient: () => FtpClient
+  def ftpClient: S => FtpClient
 
   val shape: SourceShape[T] = SourceShape(Outlet[T](s"$name.out"))
 
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
index d248dc0bf..b3dd39af8 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
@@ -31,9 +31,9 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, 
S <: RemoteFileSett
     val shape: Shape,
     val graphStageFtpLike: FtpLike[FtpClient, S],
     val connectionSettings: S,
-    val ftpClient: () => FtpClient) extends GraphStageLogic(shape) {
+    val ftpClient: S => FtpClient) extends GraphStageLogic(shape) {
 
-  protected[this] implicit val client: FtpClient = ftpClient()
+  protected[this] implicit val client: FtpClient = 
ftpClient(connectionSettings)
   protected[this] var handler: Option[graphStageFtpLike.Handler] = 
Option.empty[graphStageFtpLike.Handler]
   protected[this] var failed = false
 
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
index 00cbc5e21..b26f17426 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
@@ -49,7 +49,7 @@ private[ftp] trait FtpIOGraphStage[FtpClient, S <: 
RemoteFileSettings, Sh <: Sha
 
   def connectionSettings: S
 
-  implicit def ftpClient: () => FtpClient
+  implicit def ftpClient: S => FtpClient
 
   val ftpLike: FtpLike[FtpClient, S]
 
@@ -282,7 +282,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <: 
RemoteFileSettings]
 private[ftp] trait FtpMoveSink[FtpClient, S <: RemoteFileSettings]
     extends GraphStageWithMaterializedValue[SinkShape[FtpFile], 
Future[IOResult]] {
   val connectionSettings: S
-  val ftpClient: () => FtpClient
+  val ftpClient: S => FtpClient
   val ftpLike: FtpLike[FtpClient, S]
   val destinationPath: FtpFile => String
   val in: Inlet[FtpFile] = Inlet("FtpMvSink")
@@ -340,7 +340,7 @@ private[ftp] trait FtpMoveSink[FtpClient, S <: 
RemoteFileSettings]
 private[ftp] trait FtpRemoveSink[FtpClient, S <: RemoteFileSettings]
     extends GraphStageWithMaterializedValue[SinkShape[FtpFile], 
Future[IOResult]] {
   val connectionSettings: S
-  val ftpClient: () => FtpClient
+  val ftpClient: S => FtpClient
   val ftpLike: FtpLike[FtpClient, S]
   val in: Inlet[FtpFile] = Inlet("FtpRmSink")
 
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpSourceFactory.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpSourceFactory.scala
index 18a3094de..723201938 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpSourceFactory.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpSourceFactory.scala
@@ -30,7 +30,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <: 
RemoteFileSettings] { self =
 
   protected[this] final val DefaultChunkSize = 8192
 
-  protected[this] def ftpClient: () => FtpClient
+  protected[this] def ftpClient: S => FtpClient
 
   protected[this] def ftpBrowserSourceName: String
 
@@ -57,7 +57,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <: 
RemoteFileSettings] { self =
       lazy val name: String = ftpBrowserSourceName
       val basePath: String = _basePath
       val connectionSettings: S = _connectionSettings
-      val ftpClient: () => FtpClient = self.ftpClient
+      val ftpClient: S => FtpClient = self.ftpClient
       val ftpLike: FtpLike[FtpClient, S] = _ftpLike
       override val branchSelector: (FtpFile) => Boolean = _branchSelector
       override val emitTraversedDirectories: Boolean = 
_emitTraversedDirectories
@@ -74,7 +74,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <: 
RemoteFileSettings] { self =
 
       override def connectionSettings: S = currentConnectionSettings
 
-      override def ftpClient: () => FtpClient = self.ftpClient
+      override def ftpClient: S => FtpClient = self.ftpClient
 
       override val directoryName: String = dirName
     }
@@ -94,7 +94,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <: 
RemoteFileSettings] { self =
       lazy val name: String = ftpIOSourceName
       val path: String = _path
       val connectionSettings: S = _connectionSettings
-      val ftpClient: () => FtpClient = self.ftpClient
+      val ftpClient: S => FtpClient = self.ftpClient
       val ftpLike: FtpLike[FtpClient, S] = _ftpLike
       val chunkSize: Int = _chunkSize
       override val offset: Long = _offset
@@ -108,7 +108,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <: 
RemoteFileSettings] { self =
       lazy val name: String = ftpIOSinkName
       val path: String = _path
       val connectionSettings: S = _connectionSettings
-      val ftpClient: () => FtpClient = self.ftpClient
+      val ftpClient: S => FtpClient = self.ftpClient
       val ftpLike: FtpLike[FtpClient, S] = _ftpLike
       val append: Boolean = _append
     }
@@ -118,7 +118,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <: 
RemoteFileSettings] { self =
       _connectionSettings: S)(implicit _ftpLike: FtpLike[FtpClient, S]) =
     new FtpMoveSink[FtpClient, S] {
       val connectionSettings: S = _connectionSettings
-      val ftpClient: () => FtpClient = self.ftpClient
+      val ftpClient: S => FtpClient = self.ftpClient
       val ftpLike: FtpLike[FtpClient, S] = _ftpLike
       val destinationPath: FtpFile => String = _destinationPath
     }
@@ -127,7 +127,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <: 
RemoteFileSettings] { self =
       _connectionSettings: S)(implicit _ftpLike: FtpLike[FtpClient, S]) =
     new FtpRemoveSink[FtpClient, S] {
       val connectionSettings: S = _connectionSettings
-      val ftpClient: () => FtpClient = self.ftpClient
+      val ftpClient: S => FtpClient = self.ftpClient
       val ftpLike: FtpLike[FtpClient, S] = _ftpLike
     }
 
@@ -147,7 +147,7 @@ private[ftp] trait FtpSource extends 
FtpSourceFactory[FTPClient, FtpSettings] {
   protected final val FtpDirectorySource = "FtpDirectorySource"
   protected final val FtpIOSinkName = "FtpIOSink"
 
-  protected val ftpClient: () => FTPClient = () => new FTPClient
+  protected val ftpClient: FtpSettings => FTPClient = _ => new FTPClient
   protected val ftpBrowserSourceName: String = FtpBrowserSourceName
   protected val ftpIOSourceName: String = FtpIOSourceName
   protected val ftpIOSinkName: String = FtpIOSinkName
@@ -164,7 +164,7 @@ private[ftp] trait FtpsSource extends 
FtpSourceFactory[FTPSClient, FtpsSettings]
   protected final val FtpsDirectorySource = "FtpsDirectorySource"
   protected final val FtpsIOSinkName = "FtpsIOSink"
 
-  protected val ftpClient: () => FTPSClient = () => new FTPSClient
+  protected val ftpClient: FtpsSettings => FTPSClient = settings => new 
FTPSClient(settings.useFtpsImplicit)
   protected val ftpBrowserSourceName: String = FtpsBrowserSourceName
   protected val ftpIOSourceName: String = FtpsIOSourceName
   protected val ftpIOSinkName: String = FtpsIOSinkName
@@ -182,7 +182,7 @@ private[ftp] trait SftpSource extends 
FtpSourceFactory[SSHClient, SftpSettings]
   protected final val sFtpIOSinkName = "sFtpIOSink"
 
   def sshClient(): SSHClient = new SSHClient()
-  protected val ftpClient: () => SSHClient = () => sshClient()
+  protected val ftpClient: SftpSettings => SSHClient = _ => sshClient()
   protected val ftpBrowserSourceName: String = sFtpBrowserSourceName
   protected val ftpIOSourceName: String = sFtpIOSourceName
   protected val ftpIOSinkName: String = sFtpIOSinkName
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
index 0a05dd994..2cf990990 100644
--- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
+++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
@@ -136,7 +136,7 @@ final class FtpSettings private (
     s"credentials=$credentials," +
     s"binary=$binary," +
     s"passiveMode=$passiveMode," +
-    s"autodetectUTF8=$autodetectUTF8" +
+    s"autodetectUTF8=$autodetectUTF8," +
     s"configureConnection=$configureConnection," +
     s"proxy=$proxy)"
 }
@@ -185,6 +185,7 @@ final class FtpsSettings private (
     val credentials: FtpCredentials,
     val binary: Boolean,
     val passiveMode: Boolean,
+    val useFtpsImplicit: Boolean,
     val autodetectUTF8: Boolean,
     val configureConnection: FTPSClient => Unit,
     val proxy: Option[Proxy],
@@ -197,6 +198,8 @@ final class FtpsSettings private (
   def withBinary(value: Boolean): FtpsSettings = if (binary == value) this 
else copy(binary = value)
   def withPassiveMode(value: Boolean): FtpsSettings =
     if (passiveMode == value) this else copy(passiveMode = value)
+  def withFtpsImplicit(value: Boolean): FtpsSettings =
+    if (useFtpsImplicit == value) this else copy(useFtpsImplicit = value)
   def withAutodetectUTF8(value: Boolean): FtpsSettings =
     if (autodetectUTF8 == value) this else copy(autodetectUTF8 = value)
   def withProxy(value: Proxy): FtpsSettings = copy(proxy = Some(value))
@@ -224,6 +227,7 @@ final class FtpsSettings private (
       credentials: FtpCredentials = credentials,
       binary: Boolean = binary,
       passiveMode: Boolean = passiveMode,
+      useFtpsImplicit: Boolean = useFtpsImplicit,
       autodetectUTF8: Boolean = autodetectUTF8,
       configureConnection: FTPSClient => Unit = configureConnection,
       proxy: Option[Proxy] = proxy,
@@ -234,6 +238,7 @@ final class FtpsSettings private (
     credentials = credentials,
     binary = binary,
     passiveMode = passiveMode,
+    useFtpsImplicit = useFtpsImplicit,
     autodetectUTF8 = autodetectUTF8,
     configureConnection = configureConnection,
     proxy = proxy,
@@ -247,10 +252,11 @@ final class FtpsSettings private (
     s"credentials=$credentials," +
     s"binary=$binary," +
     s"passiveMode=$passiveMode," +
-    s"autodetectUTF8=$autodetectUTF8" +
+    s"useFtpsImplicit=$useFtpsImplicit," +
+    s"autodetectUTF8=$autodetectUTF8," +
     s"configureConnection=$configureConnection," +
-    s"proxy=$proxy" +
-    s"keyManager=$keyManager" +
+    s"proxy=$proxy," +
+    s"keyManager=$keyManager," +
     s"trustManager=$trustManager)"
 }
 
@@ -269,6 +275,7 @@ object FtpsSettings {
     FtpCredentials.AnonFtpCredentials,
     binary = false,
     passiveMode = false,
+    useFtpsImplicit = false,
     autodetectUTF8 = false,
     configureConnection = _ => (),
     proxy = None,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to