This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 1af4f80ec support ftps implicit mode (#311)
1af4f80ec is described below
commit 1af4f80ec3f61584044408003d7d1096f62729d6
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jan 28 17:06:20 2024 +0100
support ftps implicit mode (#311)
* support ftps implicit mode
* Update model.scala
* refactor
* Delete reference.conf
* Create ftps-implicit.backwards.excludes
* move file to right place
* add missing commas in settings toString
* Update model.scala
---
.../ftps-implicit.backwards.excludes | 25 ++++++++++++++++++++++
.../stream/connectors/ftp/impl/FtpGraphStage.scala | 2 +-
.../connectors/ftp/impl/FtpGraphStageLogic.scala | 4 ++--
.../connectors/ftp/impl/FtpIOGraphStage.scala | 6 +++---
.../connectors/ftp/impl/FtpSourceFactory.scala | 20 ++++++++---------
.../apache/pekko/stream/connectors/ftp/model.scala | 15 +++++++++----
6 files changed, 52 insertions(+), 20 deletions(-)
diff --git
a/ftp/src/main/mima-filters/1.1.x.backwards.excludes/ftps-implicit.backwards.excludes
b/ftp/src/main/mima-filters/1.1.x.backwards.excludes/ftps-implicit.backwards.excludes
new file mode 100644
index 000000000..f8373c0ba
--- /dev/null
+++
b/ftp/src/main/mima-filters/1.1.x.backwards.excludes/ftps-implicit.backwards.excludes
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Changed signature of private FTP code to optionally allow implicit SSL/TLS
in FTPSClient
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.ftp.FtpsSettings.this")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.javadsl.Ftp.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.javadsl.Ftps.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.javadsl.SftpApi.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.scaladsl.Ftp.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.scaladsl.Ftps.ftpClient")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.ftp.scaladsl.SftpApi.ftpClient")
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStage.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStage.scala
index 952c2a8de..b93ae08de 100644
---
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStage.scala
+++
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStage.scala
@@ -26,7 +26,7 @@ trait FtpGraphStage[FtpClient, S <: RemoteFileSettings, T]
extends GraphStage[So
def connectionSettings: S
- def ftpClient: () => FtpClient
+ def ftpClient: S => FtpClient
val shape: SourceShape[T] = SourceShape(Outlet[T](s"$name.out"))
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
index d248dc0bf..b3dd39af8 100644
---
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
+++
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpGraphStageLogic.scala
@@ -31,9 +31,9 @@ private[ftp] abstract class FtpGraphStageLogic[T, FtpClient,
S <: RemoteFileSett
val shape: Shape,
val graphStageFtpLike: FtpLike[FtpClient, S],
val connectionSettings: S,
- val ftpClient: () => FtpClient) extends GraphStageLogic(shape) {
+ val ftpClient: S => FtpClient) extends GraphStageLogic(shape) {
- protected[this] implicit val client: FtpClient = ftpClient()
+ protected[this] implicit val client: FtpClient =
ftpClient(connectionSettings)
protected[this] var handler: Option[graphStageFtpLike.Handler] =
Option.empty[graphStageFtpLike.Handler]
protected[this] var failed = false
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
index 00cbc5e21..b26f17426 100644
---
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
+++
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala
@@ -49,7 +49,7 @@ private[ftp] trait FtpIOGraphStage[FtpClient, S <:
RemoteFileSettings, Sh <: Sha
def connectionSettings: S
- implicit def ftpClient: () => FtpClient
+ implicit def ftpClient: S => FtpClient
val ftpLike: FtpLike[FtpClient, S]
@@ -282,7 +282,7 @@ private[ftp] trait FtpIOSinkStage[FtpClient, S <:
RemoteFileSettings]
private[ftp] trait FtpMoveSink[FtpClient, S <: RemoteFileSettings]
extends GraphStageWithMaterializedValue[SinkShape[FtpFile],
Future[IOResult]] {
val connectionSettings: S
- val ftpClient: () => FtpClient
+ val ftpClient: S => FtpClient
val ftpLike: FtpLike[FtpClient, S]
val destinationPath: FtpFile => String
val in: Inlet[FtpFile] = Inlet("FtpMvSink")
@@ -340,7 +340,7 @@ private[ftp] trait FtpMoveSink[FtpClient, S <:
RemoteFileSettings]
private[ftp] trait FtpRemoveSink[FtpClient, S <: RemoteFileSettings]
extends GraphStageWithMaterializedValue[SinkShape[FtpFile],
Future[IOResult]] {
val connectionSettings: S
- val ftpClient: () => FtpClient
+ val ftpClient: S => FtpClient
val ftpLike: FtpLike[FtpClient, S]
val in: Inlet[FtpFile] = Inlet("FtpRmSink")
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpSourceFactory.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpSourceFactory.scala
index 18a3094de..723201938 100644
---
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpSourceFactory.scala
+++
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpSourceFactory.scala
@@ -30,7 +30,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <:
RemoteFileSettings] { self =
protected[this] final val DefaultChunkSize = 8192
- protected[this] def ftpClient: () => FtpClient
+ protected[this] def ftpClient: S => FtpClient
protected[this] def ftpBrowserSourceName: String
@@ -57,7 +57,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <:
RemoteFileSettings] { self =
lazy val name: String = ftpBrowserSourceName
val basePath: String = _basePath
val connectionSettings: S = _connectionSettings
- val ftpClient: () => FtpClient = self.ftpClient
+ val ftpClient: S => FtpClient = self.ftpClient
val ftpLike: FtpLike[FtpClient, S] = _ftpLike
override val branchSelector: (FtpFile) => Boolean = _branchSelector
override val emitTraversedDirectories: Boolean =
_emitTraversedDirectories
@@ -74,7 +74,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <:
RemoteFileSettings] { self =
override def connectionSettings: S = currentConnectionSettings
- override def ftpClient: () => FtpClient = self.ftpClient
+ override def ftpClient: S => FtpClient = self.ftpClient
override val directoryName: String = dirName
}
@@ -94,7 +94,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <:
RemoteFileSettings] { self =
lazy val name: String = ftpIOSourceName
val path: String = _path
val connectionSettings: S = _connectionSettings
- val ftpClient: () => FtpClient = self.ftpClient
+ val ftpClient: S => FtpClient = self.ftpClient
val ftpLike: FtpLike[FtpClient, S] = _ftpLike
val chunkSize: Int = _chunkSize
override val offset: Long = _offset
@@ -108,7 +108,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <:
RemoteFileSettings] { self =
lazy val name: String = ftpIOSinkName
val path: String = _path
val connectionSettings: S = _connectionSettings
- val ftpClient: () => FtpClient = self.ftpClient
+ val ftpClient: S => FtpClient = self.ftpClient
val ftpLike: FtpLike[FtpClient, S] = _ftpLike
val append: Boolean = _append
}
@@ -118,7 +118,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <:
RemoteFileSettings] { self =
_connectionSettings: S)(implicit _ftpLike: FtpLike[FtpClient, S]) =
new FtpMoveSink[FtpClient, S] {
val connectionSettings: S = _connectionSettings
- val ftpClient: () => FtpClient = self.ftpClient
+ val ftpClient: S => FtpClient = self.ftpClient
val ftpLike: FtpLike[FtpClient, S] = _ftpLike
val destinationPath: FtpFile => String = _destinationPath
}
@@ -127,7 +127,7 @@ private[ftp] trait FtpSourceFactory[FtpClient, S <:
RemoteFileSettings] { self =
_connectionSettings: S)(implicit _ftpLike: FtpLike[FtpClient, S]) =
new FtpRemoveSink[FtpClient, S] {
val connectionSettings: S = _connectionSettings
- val ftpClient: () => FtpClient = self.ftpClient
+ val ftpClient: S => FtpClient = self.ftpClient
val ftpLike: FtpLike[FtpClient, S] = _ftpLike
}
@@ -147,7 +147,7 @@ private[ftp] trait FtpSource extends
FtpSourceFactory[FTPClient, FtpSettings] {
protected final val FtpDirectorySource = "FtpDirectorySource"
protected final val FtpIOSinkName = "FtpIOSink"
- protected val ftpClient: () => FTPClient = () => new FTPClient
+ protected val ftpClient: FtpSettings => FTPClient = _ => new FTPClient
protected val ftpBrowserSourceName: String = FtpBrowserSourceName
protected val ftpIOSourceName: String = FtpIOSourceName
protected val ftpIOSinkName: String = FtpIOSinkName
@@ -164,7 +164,7 @@ private[ftp] trait FtpsSource extends
FtpSourceFactory[FTPSClient, FtpsSettings]
protected final val FtpsDirectorySource = "FtpsDirectorySource"
protected final val FtpsIOSinkName = "FtpsIOSink"
- protected val ftpClient: () => FTPSClient = () => new FTPSClient
+ protected val ftpClient: FtpsSettings => FTPSClient = settings => new
FTPSClient(settings.useFtpsImplicit)
protected val ftpBrowserSourceName: String = FtpsBrowserSourceName
protected val ftpIOSourceName: String = FtpsIOSourceName
protected val ftpIOSinkName: String = FtpsIOSinkName
@@ -182,7 +182,7 @@ private[ftp] trait SftpSource extends
FtpSourceFactory[SSHClient, SftpSettings]
protected final val sFtpIOSinkName = "sFtpIOSink"
def sshClient(): SSHClient = new SSHClient()
- protected val ftpClient: () => SSHClient = () => sshClient()
+ protected val ftpClient: SftpSettings => SSHClient = _ => sshClient()
protected val ftpBrowserSourceName: String = sFtpBrowserSourceName
protected val ftpIOSourceName: String = sFtpIOSourceName
protected val ftpIOSinkName: String = sFtpIOSinkName
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
index 0a05dd994..2cf990990 100644
--- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
+++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/model.scala
@@ -136,7 +136,7 @@ final class FtpSettings private (
s"credentials=$credentials," +
s"binary=$binary," +
s"passiveMode=$passiveMode," +
- s"autodetectUTF8=$autodetectUTF8" +
+ s"autodetectUTF8=$autodetectUTF8," +
s"configureConnection=$configureConnection," +
s"proxy=$proxy)"
}
@@ -185,6 +185,7 @@ final class FtpsSettings private (
val credentials: FtpCredentials,
val binary: Boolean,
val passiveMode: Boolean,
+ val useFtpsImplicit: Boolean,
val autodetectUTF8: Boolean,
val configureConnection: FTPSClient => Unit,
val proxy: Option[Proxy],
@@ -197,6 +198,8 @@ final class FtpsSettings private (
def withBinary(value: Boolean): FtpsSettings = if (binary == value) this
else copy(binary = value)
def withPassiveMode(value: Boolean): FtpsSettings =
if (passiveMode == value) this else copy(passiveMode = value)
+ def withFtpsImplicit(value: Boolean): FtpsSettings =
+ if (useFtpsImplicit == value) this else copy(useFtpsImplicit = value)
def withAutodetectUTF8(value: Boolean): FtpsSettings =
if (autodetectUTF8 == value) this else copy(autodetectUTF8 = value)
def withProxy(value: Proxy): FtpsSettings = copy(proxy = Some(value))
@@ -224,6 +227,7 @@ final class FtpsSettings private (
credentials: FtpCredentials = credentials,
binary: Boolean = binary,
passiveMode: Boolean = passiveMode,
+ useFtpsImplicit: Boolean = useFtpsImplicit,
autodetectUTF8: Boolean = autodetectUTF8,
configureConnection: FTPSClient => Unit = configureConnection,
proxy: Option[Proxy] = proxy,
@@ -234,6 +238,7 @@ final class FtpsSettings private (
credentials = credentials,
binary = binary,
passiveMode = passiveMode,
+ useFtpsImplicit = useFtpsImplicit,
autodetectUTF8 = autodetectUTF8,
configureConnection = configureConnection,
proxy = proxy,
@@ -247,10 +252,11 @@ final class FtpsSettings private (
s"credentials=$credentials," +
s"binary=$binary," +
s"passiveMode=$passiveMode," +
- s"autodetectUTF8=$autodetectUTF8" +
+ s"useFtpsImplicit=$useFtpsImplicit," +
+ s"autodetectUTF8=$autodetectUTF8," +
s"configureConnection=$configureConnection," +
- s"proxy=$proxy" +
- s"keyManager=$keyManager" +
+ s"proxy=$proxy," +
+ s"keyManager=$keyManager," +
s"trustManager=$trustManager)"
}
@@ -269,6 +275,7 @@ object FtpsSettings {
FtpCredentials.AnonFtpCredentials,
binary = false,
passiveMode = false,
+ useFtpsImplicit = false,
autodetectUTF8 = false,
configureConnection = _ => (),
proxy = None,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]