This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git
The following commit(s) were added to refs/heads/main by this push:
new 613b1a6 [rotate-logs-to-ftp] Migrate akka dependencies to pekko (#21)
613b1a6 is described below
commit 613b1a6414bc711f568189b15a6b095b4c734fb8
Author: Laglangyue <[email protected]>
AuthorDate: Mon Dec 18 00:06:57 2023 +0800
[rotate-logs-to-ftp] Migrate akka dependencies to pekko (#21)
* [rotate-logs-to-ftp] Migrate akka dependencies to pekko
* Update application.conf
---------
Co-authored-by: laglang <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
---
.../build.sbt | 2 +-
.../project/Dependencies.scala | 21 +-
.../java/playground/filesystem/FileSystemMock.java | 136 +++---
.../java/playground/filesystem/JimfsFactory.java | 82 ++--
.../playground/filesystem/impl/JimfsFtpFile.java | 530 ++++++++++-----------
.../java/playground/filesystem/impl/JimfsView.java | 376 ++++++++-------
.../filesystem/impl/NameEqualsPathFilter.java | 5 +-
.../src/main/java/samples/javadsl/Main.java | 222 +++++----
.../src/main/resources/application.conf | 6 +-
.../src/main/resources/logback.xml | 6 +-
.../main/scala/playground/SftpServerEmbedded.scala | 2 +-
.../src/main/scala/samples/scaladsl/Main.scala | 22 +-
12 files changed, 701 insertions(+), 709 deletions(-)
diff --git a/pekko-connectors-sample-rotate-logs-to-ftp/build.sbt
b/pekko-connectors-sample-rotate-logs-to-ftp/build.sbt
index 47329e0..6148cd2 100644
--- a/pekko-connectors-sample-rotate-logs-to-ftp/build.sbt
+++ b/pekko-connectors-sample-rotate-logs-to-ftp/build.sbt
@@ -1,6 +1,6 @@
organization := "org.apache.pekko"
name := "pekko-connectors-sample-rotate-logs-to-ftp"
-version := "1.0.0"
+version := "1.0.1"
scalaVersion := Dependencies.scalaVer
libraryDependencies ++= Dependencies.dependencies
javacOptions += "-Xlint:unchecked"
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/project/Dependencies.scala
b/pekko-connectors-sample-rotate-logs-to-ftp/project/Dependencies.scala
index 2829787..8bd8882 100644
--- a/pekko-connectors-sample-rotate-logs-to-ftp/project/Dependencies.scala
+++ b/pekko-connectors-sample-rotate-logs-to-ftp/project/Dependencies.scala
@@ -3,18 +3,18 @@ import sbt._
object Dependencies {
val scalaVer = "2.13.12"
// #deps
- val PekkoVersion = "2.6.19"
- val PekkoConnectorsVersion = "4.0.0"
+ val PekkoVersion = "1.0.2"
+ val PekkoConnectorsVersion = "1.0.1"
// #deps
val dependencies = List(
- // #deps
- "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
- "com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
- "com.typesafe.akka" %% "akka-actor" % PekkoVersion,
- "com.lightbend.akka" %% "akka-stream-alpakka-file" %
PekkoConnectorsVersion,
- "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % PekkoConnectorsVersion,
+ // #deps
+ "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-actor" % PekkoVersion,
+ "org.apache.pekko" %% "pekko-connectors-file" % PekkoConnectorsVersion,
+ "org.apache.pekko" %% "pekko-connectors-ftp" % PekkoConnectorsVersion,
// #deps
// Playground file system and FTP server
// https://mina.apache.org/ftpserver-project/downloads.html
@@ -23,7 +23,6 @@ object Dependencies {
"org.apache.sshd" % "sshd-sftp" % "2.5.1", // ApacheV2
"com.google.jimfs" % "jimfs" % "1.1", // ApacheV2
// Logging
- "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
- "ch.qos.logback" % "logback-classic" % "1.2.13"
- )
+ "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
+ "ch.qos.logback" % "logback-classic" % "1.2.13")
}
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/FileSystemMock.java
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/FileSystemMock.java
index b0a59df..4d3690a 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/FileSystemMock.java
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/FileSystemMock.java
@@ -13,85 +13,85 @@ import java.nio.file.attribute.BasicFileAttributes;
public class FileSystemMock {
- public final FileSystem fileSystem;
+ public final FileSystem fileSystem;
- private String loremIpsum =
- "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Praesent
auctor imperdiet " +
- "velit, eu dapibus nisl dapibus vitae. Sed quam lacus, fringilla
posuere ligula at, " +
- "aliquet laoreet nulla. Aliquam id fermentum justo. Aliquam et massa
consequat, " +
- "pellentesque dolor nec, gravida libero. Phasellus elit eros, finibus
eget " +
- "sollicitudin ac, consectetur sed ante. Etiam ornare lacus blandit
nisi gravida " +
- "accumsan. Sed in lorem arcu. Vivamus et eleifend ligula. Maecenas ut
commodo ante. " +
- "Suspendisse sit amet placerat arcu, porttitor sagittis velit. Quisque
gravida mi a " +
- "porttitor ornare. Cras lorem nisl, sollicitudin vitae odio at,
vehicula maximus " +
- "mauris. Sed ac purus ac turpis pellentesque cursus ac eget est.
Pellentesque " +
- "habitant morbi tristique senectus et netus et malesuada fames ac
turpis egestas.";
+ private String loremIpsum =
+ "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Praesent
auctor imperdiet " +
+ "velit, eu dapibus nisl dapibus vitae. Sed quam lacus, fringilla
posuere ligula at, " +
+ "aliquet laoreet nulla. Aliquam id fermentum justo. Aliquam et massa
consequat, " +
+ "pellentesque dolor nec, gravida libero. Phasellus elit eros,
finibus eget " +
+ "sollicitudin ac, consectetur sed ante. Etiam ornare lacus blandit
nisi gravida " +
+ "accumsan. Sed in lorem arcu. Vivamus et eleifend ligula. Maecenas
ut commodo ante. " +
+ "Suspendisse sit amet placerat arcu, porttitor sagittis velit.
Quisque gravida mi a " +
+ "porttitor ornare. Cras lorem nisl, sollicitudin vitae odio at,
vehicula maximus " +
+ "mauris. Sed ac purus ac turpis pellentesque cursus ac eget est.
Pellentesque " +
+ "habitant morbi tristique senectus et netus et malesuada fames ac
turpis egestas.";
- public FileSystemMock(FileSystem fs) {
- fileSystem = fs;
- }
+ public FileSystemMock(FileSystem fs) {
+ fileSystem = fs;
+ }
- public FileSystemMock() {
- this(Jimfs.newFileSystem(Configuration.unix()));
- }
+ public FileSystemMock() {
+ this(Jimfs.newFileSystem(Configuration.unix()));
+ }
- public void cleanFiles() {
- for (Path rootDir : getFileSystem().getRootDirectories()) {
- try {
- Files.walkFileTree(rootDir, new SimpleFileVisitor<Path>() {
- @Override
- public FileVisitResult visitFile(Path file,
BasicFileAttributes attrs) throws IOException {
- Files.deleteIfExists(file);
- return FileVisitResult.CONTINUE;
- }
- });
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
+ public void cleanFiles() {
+ for (Path rootDir : getFileSystem().getRootDirectories()) {
+ try {
+ Files.walkFileTree(rootDir, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs) throws IOException {
+ Files.deleteIfExists(file);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
}
+ }
- public void generateFiles(int numFiles, int pageSize, String basePath) {
- String base = "";
- if (!basePath.isEmpty()) {
- if ('/' != basePath.charAt(0)) {
- base = "/" + basePath;
- } else {
- base = basePath;
- }
- }
- int i = 1;
- while (i <= numFiles) {
- int j = i / pageSize;
- String subDir = (j > 0) ? "/dir_" + j : "";
- putFileOnFtp(base + subDir, "sample_" + i);
- i++;
- }
+ public void generateFiles(int numFiles, int pageSize, String basePath) {
+ String base = "";
+ if (!basePath.isEmpty()) {
+ if ('/' != basePath.charAt(0)) {
+ base = "/" + basePath;
+ } else {
+ base = basePath;
+ }
}
-
- public void putFileOnFtp(String path, String fileName) {
- putFileOnFtpWithContents(path, fileName, loremIpsum.getBytes());
+ int i = 1;
+ while (i <= numFiles) {
+ int j = i / pageSize;
+ String subDir = (j > 0) ? "/dir_" + j : "";
+ putFileOnFtp(base + subDir, "sample_" + i);
+ i++;
}
+ }
- public void putFileOnFtpWithContents(String path, String fileName, byte[]
fileContents) {
- try {
- Path baseDir = getFileSystem().getPath(path);
- if (!Files.exists(baseDir)) {
- Files.createDirectories(baseDir);
- }
- Path filePath = baseDir.resolve(fileName);
- Files.write(filePath, fileContents);
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- }
+ public void putFileOnFtp(String path, String fileName) {
+ putFileOnFtpWithContents(path, fileName, loremIpsum.getBytes());
+ }
- FileSystem getFileSystem() {
- return fileSystem;
+ public void putFileOnFtpWithContents(String path, String fileName, byte[]
fileContents) {
+ try {
+ Path baseDir = getFileSystem().getPath(path);
+ if (!Files.exists(baseDir)) {
+ Files.createDirectories(baseDir);
+ }
+ Path filePath = baseDir.resolve(fileName);
+ Files.write(filePath, fileContents);
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
}
+ }
- public String getLoremIpsum() {
- return loremIpsum;
- }
+ FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ public String getLoremIpsum() {
+ return loremIpsum;
+ }
}
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/JimfsFactory.java
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/JimfsFactory.java
index 5f7e87f..455a670 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/JimfsFactory.java
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/JimfsFactory.java
@@ -19,56 +19,56 @@ import java.nio.file.Path;
public class JimfsFactory implements FileSystemFactory {
- private final Logger LOG = LoggerFactory.getLogger(JimfsFactory.class);
+ private final Logger LOG = LoggerFactory.getLogger(JimfsFactory.class);
- private FileSystem fileSystem;
+ private FileSystem fileSystem;
- private boolean createHome;
+ private boolean createHome;
- private boolean caseInsensitive;
+ private boolean caseInsensitive;
- public boolean isCreateHome() {
- return createHome;
- }
+ public boolean isCreateHome() {
+ return createHome;
+ }
- public void setCreateHome(boolean createHome) {
- this.createHome = createHome;
- }
+ public void setCreateHome(boolean createHome) {
+ this.createHome = createHome;
+ }
- public boolean isCaseInsensitive() {
- return caseInsensitive;
- }
+ public boolean isCaseInsensitive() {
+ return caseInsensitive;
+ }
- public void setCaseInsensitive(boolean caseInsensitive) {
- this.caseInsensitive = caseInsensitive;
- }
+ public void setCaseInsensitive(boolean caseInsensitive) {
+ this.caseInsensitive = caseInsensitive;
+ }
- public JimfsFactory(FileSystem fileSystem) {
- this.fileSystem = fileSystem;
- }
+ public JimfsFactory(FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ }
- @Override
- public FileSystemView createFileSystemView(User user) throws FtpException {
- synchronized (user) {
- // create home if does not exist
- if (createHome) {
- String homeDirStr = user.getHomeDirectory();
- Path homeDir = fileSystem.getPath(homeDirStr);
- if (Files.isRegularFile(homeDir)) {
- LOG.warn("Not a directory :: " + homeDirStr);
- throw new FtpException("Not a directory :: " + homeDirStr);
- }
- if (!Files.exists(homeDir)) {
- try {
- Files.createDirectories(homeDir);
- } catch (IOException t) {
- final String msg = "Cannot create user home :: " +
homeDirStr;
- LOG.warn(msg);
- throw new FtpException(msg, t);
- }
- }
- }
- return new JimfsView(fileSystem, user, caseInsensitive);
+ @Override
+ public FileSystemView createFileSystemView(User user) throws FtpException {
+ synchronized (user) {
+ // create home if not exist
+ if (createHome) {
+ String homeDirStr = user.getHomeDirectory();
+ Path homeDir = fileSystem.getPath(homeDirStr);
+ if (Files.isRegularFile(homeDir)) {
+ LOG.warn("Not a directory :: " + homeDirStr);
+ throw new FtpException("Not a directory :: " + homeDirStr);
+ }
+ if (!Files.exists(homeDir)) {
+ try {
+ Files.createDirectories(homeDir);
+ } catch (IOException t) {
+ final String msg = "Cannot create user home :: " + homeDirStr;
+ LOG.warn(msg);
+ throw new FtpException(msg, t);
+ }
}
+ }
+ return new JimfsView(fileSystem, user, caseInsensitive);
}
+ }
}
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsFtpFile.java
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsFtpFile.java
index c51f644..be0009f 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsFtpFile.java
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsFtpFile.java
@@ -20,325 +20,325 @@ import java.util.List;
public class JimfsFtpFile implements FtpFile {
- private final Logger LOG = LoggerFactory.getLogger(JimfsFtpFile.class);
+ private final Logger LOG = LoggerFactory.getLogger(JimfsFtpFile.class);
- // the file name with respect to the user root (so it's the virtual
filename).
- // The path separator character will be '/' and
- // it will always begin with '/'.
- private String fileName;
+ // the file name with respect to the user root (so it's the virtual
filename).
+ // The path separator character will be '/' and
+ // it will always begin with '/'.
+ private String fileName;
- // The `physical` path in the underlying file system (but happens that
- // in this case, the `physical` path will be also virtusl (jimfs), so
crazy).
- private Path path;
+ // The `physical` path in the underlying file system (but happens that
+ // in this case, the `physical` path will be also virtusl (jimfs), so crazy).
+ private Path path;
- private User user;
+ private User user;
- protected JimfsFtpFile(final String fileName, final Path path,
- final User user) {
- if (fileName == null) {
- throw new IllegalArgumentException("fileName can not be null");
- }
- if (path == null) {
- throw new IllegalArgumentException("path can not be null");
- }
- if (fileName.length() == 0) {
- throw new IllegalArgumentException("fileName can not be empty");
- } else if (fileName.charAt(0) != '/') {
- throw new IllegalArgumentException(
- "fileName must be an absolute path");
- }
-
- this.fileName = fileName;
- this.path = path;
- this.user = user;
+ protected JimfsFtpFile(final String fileName, final Path path,
+ final User user) {
+ if (fileName == null) {
+ throw new IllegalArgumentException("fileName can not be null");
}
-
- public String getAbsolutePath() {
-
- // strip the last '/' if necessary
- String fullName = fileName;
- int filelen = fullName.length();
- if ((filelen != 1) && (fullName.charAt(filelen - 1) == '/')) {
- fullName = fullName.substring(0, filelen - 1);
- }
- return fullName;
+ if (path == null) {
+ throw new IllegalArgumentException("path can not be null");
+ }
+ if (fileName.length() == 0) {
+ throw new IllegalArgumentException("fileName can not be empty");
+ } else if (fileName.charAt(0) != '/') {
+ throw new IllegalArgumentException(
+ "fileName must be an absolute path");
}
- public String getName() {
-
- // root - the short name will be '/'
- if (fileName.equals("/")) {
- return "/";
- }
-
- // strip the last '/'
- String shortName = fileName;
- int filelen = fileName.length();
- if (shortName.charAt(filelen - 1) == '/') {
- shortName = shortName.substring(0, filelen - 1);
- }
+ this.fileName = fileName;
+ this.path = path;
+ this.user = user;
+ }
- // return from the last '/'
- int slashIndex = shortName.lastIndexOf('/');
- if (slashIndex != -1) {
- shortName = shortName.substring(slashIndex + 1);
- }
- return shortName;
- }
+ public String getAbsolutePath() {
- public boolean isHidden() {
- try {
- return Files.isHidden(path);
- } catch (IOException t) {
- LOG.error(t.getMessage());
- }
- return false;
+ // strip the last '/' if necessary
+ String fullName = fileName;
+ int filelen = fullName.length();
+ if ((filelen != 1) && (fullName.charAt(filelen - 1) == '/')) {
+ fullName = fullName.substring(0, filelen - 1);
}
+ return fullName;
+ }
- public boolean isDirectory() {
- return Files.isDirectory(path);
- }
+ public String getName() {
- public boolean isFile() {
- return Files.isRegularFile(path);
+ // root - the short name will be '/'
+ if (fileName.equals("/")) {
+ return "/";
}
- public boolean doesExist() {
- return Files.exists(path);
+ // strip the last '/'
+ String shortName = fileName;
+ int filelen = fileName.length();
+ if (shortName.charAt(filelen - 1) == '/') {
+ shortName = shortName.substring(0, filelen - 1);
}
- public long getSize() {
- try {
- return Files.size(path);
- } catch (IOException t) {
- LOG.error(t.getMessage());
- }
- return -1;
+ // return from the last '/'
+ int slashIndex = shortName.lastIndexOf('/');
+ if (slashIndex != -1) {
+ shortName = shortName.substring(slashIndex + 1);
}
-
- public String getOwnerName() {
- return "user";
+ return shortName;
+ }
+
+ public boolean isHidden() {
+ try {
+ return Files.isHidden(path);
+ } catch (IOException t) {
+ LOG.error(t.getMessage());
}
-
- public String getGroupName() {
- return "group";
+ return false;
+ }
+
+ public boolean isDirectory() {
+ return Files.isDirectory(path);
+ }
+
+ public boolean isFile() {
+ return Files.isRegularFile(path);
+ }
+
+ public boolean doesExist() {
+ return Files.exists(path);
+ }
+
+ public long getSize() {
+ try {
+ return Files.size(path);
+ } catch (IOException t) {
+ LOG.error(t.getMessage());
}
-
- public int getLinkCount() {
- return Files.isDirectory(path) ? 3 : 1;
+ return -1;
+ }
+
+ public String getOwnerName() {
+ return "user";
+ }
+
+ public String getGroupName() {
+ return "group";
+ }
+
+ public int getLinkCount() {
+ return Files.isDirectory(path) ? 3 : 1;
+ }
+
+ public long getLastModified() {
+ try {
+ return Files.getLastModifiedTime(path).toMillis();
+ } catch (IOException t) {
+ LOG.error(t.getMessage());
}
-
- public long getLastModified() {
- try {
- return Files.getLastModifiedTime(path).toMillis();
- } catch (IOException t) {
- LOG.error(t.getMessage());
- }
- return -1;
+ return -1;
+ }
+
+ public boolean setLastModified(long time) {
+ try {
+ Files.setLastModifiedTime(path, FileTime.fromMillis(time));
+ return true;
+ } catch (IOException t) {
+ LOG.error(t.getMessage());
}
-
- public boolean setLastModified(long time) {
- try {
- Files.setLastModifiedTime(path, FileTime.fromMillis(time));
- return true;
- } catch (IOException t) {
- LOG.error(t.getMessage());
- }
- return false;
+ return false;
+ }
+
+ public boolean isReadable() {
+ return Files.isReadable(path);
+ }
+
+ public boolean isWritable() {
+ LOG.debug("Checking authorization for " + getAbsolutePath());
+ if (user.authorize(new WriteRequest(getAbsolutePath())) == null) {
+ LOG.debug("Not authorized");
+ return false;
}
- public boolean isReadable() {
- return Files.isReadable(path);
+ LOG.debug("Checking if file exists");
+ if (Files.exists(path)) {
+ LOG.debug("Checking can write: " + getAbsolutePath());
+ return Files.isWritable(path);
}
- public boolean isWritable() {
- LOG.debug("Checking authorization for " + getAbsolutePath());
- if (user.authorize(new WriteRequest(getAbsolutePath())) == null) {
- LOG.debug("Not authorized");
- return false;
- }
+ LOG.debug("Authorized");
+ return true;
+ }
- LOG.debug("Checking if file exists");
- if (Files.exists(path)) {
- LOG.debug("Checking can write: " + getAbsolutePath());
- return Files.isWritable(path);
- }
+ public boolean isRemovable() {
- LOG.debug("Authorized");
- return true;
+ // root cannot be deleted
+ if ("/".equals(fileName)) {
+ return false;
}
- public boolean isRemovable() {
-
- // root cannot be deleted
- if ("/".equals(fileName)) {
- return false;
- }
-
- String fullName = getAbsolutePath();
-
- // we check FTPServer's write permission for this file.
- if (user.authorize(new WriteRequest(fullName)) == null) {
- return false;
- }
-
- // In order to maintain consistency, when possible we delete the last
'/' character in the String
- int indexOfSlash = fullName.lastIndexOf('/');
- String parentFullName;
- if (indexOfSlash == 0) {
- parentFullName = "/";
- } else {
- parentFullName = fullName.substring(0, indexOfSlash);
- }
+ String fullName = getAbsolutePath();
- JimfsFtpFile parentObject = new JimfsFtpFile(parentFullName,
- path.getParent(), user);
- return parentObject.isWritable();
+ // we check FTPServer's write permission for this file.
+ if (user.authorize(new WriteRequest(fullName)) == null) {
+ return false;
}
- public boolean delete() {
- boolean retVal = false;
- try {
- if (isRemovable()) {
- Files.delete(path);
- retVal = true;
- }
- } catch (IOException t) {
- LOG.error(t.getMessage());
- }
- return retVal;
+ // In order to maintain consistency, when possible we delete the last '/'
character in the String
+ int indexOfSlash = fullName.lastIndexOf('/');
+ String parentFullName;
+ if (indexOfSlash == 0) {
+ parentFullName = "/";
+ } else {
+ parentFullName = fullName.substring(0, indexOfSlash);
}
- public boolean move(final FtpFile dest) {
- boolean retVal = false;
- if (dest.isWritable() && isReadable()) {
- Path destPath = ((JimfsFtpFile) dest).path;
-
- if (Files.exists(destPath)) {
- // renameTo behaves differently on different platforms
- // this check verifies that if the destination already exists,
- // we fail
- retVal = false;
- } else {
- try {
- Files.move(path, destPath,
StandardCopyOption.REPLACE_EXISTING);
- retVal = true;
- } catch (IOException t) {
- LOG.error(t.getMessage());
- }
- }
- }
- return retVal;
+ JimfsFtpFile parentObject = new JimfsFtpFile(parentFullName,
+ path.getParent(), user);
+ return parentObject.isWritable();
+ }
+
+ public boolean delete() {
+ boolean retVal = false;
+ try {
+ if (isRemovable()) {
+ Files.delete(path);
+ retVal = true;
+ }
+ } catch (IOException t) {
+ LOG.error(t.getMessage());
}
-
- public boolean mkdir() {
- boolean retVal = false;
+ return retVal;
+ }
+
+ public boolean move(final FtpFile dest) {
+ boolean retVal = false;
+ if (dest.isWritable() && isReadable()) {
+ Path destPath = ((JimfsFtpFile) dest).path;
+
+ if (Files.exists(destPath)) {
+ // renameTo behaves differently on different platforms
+ // this check verifies that if the destination already exists,
+ // we fail
+ retVal = false;
+ } else {
try {
- if (isWritable()) {
- Files.createDirectory(path);
- retVal = true;
- }
+ Files.move(path, destPath, StandardCopyOption.REPLACE_EXISTING);
+ retVal = true;
} catch (IOException t) {
- LOG.error(t.getMessage());
+ LOG.error(t.getMessage());
}
- return retVal;
+ }
}
-
- public Path getPhysicalFile() {
- return path;
+ return retVal;
+ }
+
+ public boolean mkdir() {
+ boolean retVal = false;
+ try {
+ if (isWritable()) {
+ Files.createDirectory(path);
+ retVal = true;
+ }
+ } catch (IOException t) {
+ LOG.error(t.getMessage());
}
+ return retVal;
+ }
- public List<FtpFile> listFiles() {
-
- // is a directory
- if (!Files.isDirectory(path)) {
- return null;
- }
-
- // directory - return all the files
- DirectoryStream<Path> filesStream = null;
- try {
- filesStream = Files.newDirectoryStream(path);
- } catch(IOException t) {
- LOG.error(t.getMessage());
- }
+ public Path getPhysicalFile() {
+ return path;
+ }
- if (filesStream == null) {
- return null;
- }
-
- List<Path> files = new ArrayList<>();
- for (Path path : filesStream) {
- files.add(path);
- }
+ public List<FtpFile> listFiles() {
- // make sure the files are returned in order
- Collections.sort(files, new Comparator<Path>() {
- public int compare(Path o1, Path o2) {
- return o1.getFileName().compareTo(o2.getFileName());
- }
- });
-
- // get the virtual name of the base directory
- final String virtualFileStr =
- getAbsolutePath().charAt(getAbsolutePath().length() - 1) != '/'
- ? getAbsolutePath() + '/' : getAbsolutePath();
-
- // now return all the files under the directory
- List<FtpFile> virtualFiles = new ArrayList<>(files.size());
- for (Path file : files) {
- String fileName = virtualFileStr + file.getFileName();
- virtualFiles.add(new JimfsFtpFile(fileName, file, user));
- }
- return virtualFiles;
+ // is a directory
+ if (!Files.isDirectory(path)) {
+ return null;
}
- public OutputStream createOutputStream(long offset)
- throws IOException {
+ // directory - return all the files
+ DirectoryStream<Path> filesStream = null;
+ try {
+ filesStream = Files.newDirectoryStream(path);
+ } catch (IOException t) {
+ LOG.error(t.getMessage());
+ }
- // permission check
- if (!isWritable()) {
- throw new IOException("No write permission : " +
path.getFileName());
- }
+ if (filesStream == null) {
+ return null;
+ }
- // create output stream
- final RandomAccessFile raf = new RandomAccessFile(path.toFile(), "rw");
- raf.setLength(offset);
- raf.seek(offset);
-
- // The IBM jre needs to have both the stream and the random access file
- // objects closed to actually close the file
- return new FileOutputStream(raf.getFD()) {
- @Override
- public void close() throws IOException {
- super.close();
- raf.close();
- }
- };
+ List<Path> files = new ArrayList<>();
+ for (Path path : filesStream) {
+ files.add(path);
}
- public InputStream createInputStream(long offset)
- throws IOException {
+ // make sure the files are returned in order
+ Collections.sort(files, new Comparator<Path>() {
+ public int compare(Path o1, Path o2) {
+ return o1.getFileName().compareTo(o2.getFileName());
+ }
+ });
+
+ // get the virtual name of the base directory
+ final String virtualFileStr =
+ getAbsolutePath().charAt(getAbsolutePath().length() - 1) != '/'
+ ? getAbsolutePath() + '/' : getAbsolutePath();
+
+ // now return all the files under the directory
+ List<FtpFile> virtualFiles = new ArrayList<>(files.size());
+ for (Path file : files) {
+ String fileName = virtualFileStr + file.getFileName();
+ virtualFiles.add(new JimfsFtpFile(fileName, file, user));
+ }
+ return virtualFiles;
+ }
- // permission check
- if (!isReadable()) {
- throw new IOException("No read permission : " +
path.getFileName());
- }
+ public OutputStream createOutputStream(long offset)
+ throws IOException {
- return Files.newInputStream(path, StandardOpenOption.READ);
+ // permission check
+ if (!isWritable()) {
+ throw new IOException("No write permission : " + path.getFileName());
}
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof JimfsFtpFile) {
- Path otherPath = ((JimfsFtpFile) obj).path.normalize();
- return this.path.normalize().equals(otherPath);
- }
- return false;
+ // create output stream
+ final RandomAccessFile raf = new RandomAccessFile(path.toFile(), "rw");
+ raf.setLength(offset);
+ raf.seek(offset);
+
+ // The IBM jre needs to have both the stream and the random access file
+ // objects closed to actually close the file
+ return new FileOutputStream(raf.getFD()) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ raf.close();
+ }
+ };
+ }
+
+ public InputStream createInputStream(long offset)
+ throws IOException {
+
+ // permission check
+ if (!isReadable()) {
+ throw new IOException("No read permission : " + path.getFileName());
}
- @Override
- public int hashCode() {
- return path.normalize().hashCode();
+ return Files.newInputStream(path, StandardOpenOption.READ);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof JimfsFtpFile) {
+ Path otherPath = ((JimfsFtpFile) obj).path.normalize();
+ return this.path.normalize().equals(otherPath);
}
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return path.normalize().hashCode();
+ }
}
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsView.java
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsView.java
index eebf6c3..a18ea46 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsView.java
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsView.java
@@ -21,231 +21,229 @@ import java.util.List;
import java.util.StringTokenizer;
/**
- * File system view based on the in-memory jimfs file system. The root in this
- * class is the user virtual root (/).
+ * File system view based on the in-memory jimfs file system. The root in this
class is the user virtual root (/).
*/
public class JimfsView implements FileSystemView {
- private final Logger LOG = LoggerFactory.getLogger(JimfsView.class);
+ private final Logger LOG = LoggerFactory.getLogger(JimfsView.class);
- // this will be the jimfs file system in runtime.
- private FileSystem fileSystem;
+ // this will be the jimfs file system in runtime.
+ private FileSystem fileSystem;
- // the root directory will always end with '/'.
- private String rootDir;
+ // the root directory will always end with '/'.
+ private String rootDir;
- // the first and the last character will always be '/'
- // It is always with respect to the root directory.
- private String currDir;
+ // the first and the last character will always be '/'
+ // It is always with respect to the root directory.
+ private String currDir;
- private User user;
+ private User user;
- private boolean caseInsensitive = false;
+ private boolean caseInsensitive = false;
- public JimfsView(FileSystem fileSystem, User user, boolean caseInsensitive)
- throws FtpException {
- if (fileSystem == null) {
- throw new IllegalArgumentException("filesystem can not be null");
- }
- if (user == null) {
- throw new IllegalArgumentException("user can not be null");
- }
- if (user.getHomeDirectory() == null) {
- throw new IllegalArgumentException("user home directory can not be
null");
- }
+ public JimfsView(FileSystem fileSystem, User user, boolean caseInsensitive)
+ throws FtpException {
+ if (fileSystem == null) {
+ throw new IllegalArgumentException("filesystem can not be null");
+ }
+ if (user == null) {
+ throw new IllegalArgumentException("user can not be null");
+ }
+ if (user.getHomeDirectory() == null) {
+ throw new IllegalArgumentException("user home directory can not be
null");
+ }
- this.fileSystem = fileSystem;
+ this.fileSystem = fileSystem;
- this.caseInsensitive = caseInsensitive;
+ this.caseInsensitive = caseInsensitive;
- // add last '/' if necessary
- String rootDir = user.getHomeDirectory();
- rootDir = normalizeSeparateChar(rootDir);
- if (!rootDir.endsWith("/")) {
- rootDir += '/';
- }
+ // add last '/' if necessary
+ String rootDir = user.getHomeDirectory();
+ rootDir = normalizeSeparateChar(rootDir);
+ if (!rootDir.endsWith("/")) {
+ rootDir += '/';
+ }
- LOG.debug("Jimfs filesystem view created by user \"{}\" with root
\"{}\"", user.getName(), rootDir);
+ LOG.debug("Jimfs filesystem view created by user \"{}\" with root \"{}\"",
user.getName(), rootDir);
- this.rootDir = rootDir;
+ this.rootDir = rootDir;
- this.user = user;
+ this.user = user;
- currDir = "/";
+ currDir = "/";
- }
+ }
- /**
- * Get the user home directory. It would be the file system root
- * for the specific user.
- */
- public FtpFile getHomeDirectory() throws FtpException {
- return new JimfsFtpFile("/", fileSystem.getPath(rootDir), user);
- }
+ /**
+ * Get the user home directory. It would be the file system root for the
specific user.
+ */
+ public FtpFile getHomeDirectory() throws FtpException {
+ return new JimfsFtpFile("/", fileSystem.getPath(rootDir), user);
+ }
- /**
- * Get the current directory.
- */
- public FtpFile getWorkingDirectory() throws FtpException {
- FtpFile fileObj;
- if (currDir.equals("/")) {
- fileObj = getHomeDirectory();
- } else {
- Path path = fileSystem.getPath(rootDir, currDir.substring(1));
- fileObj = new JimfsFtpFile(currDir, path, user);
- }
- return fileObj;
+ /**
+ * Get the current directory.
+ */
+ public FtpFile getWorkingDirectory() throws FtpException {
+ FtpFile fileObj;
+ if (currDir.equals("/")) {
+ fileObj = getHomeDirectory();
+ } else {
+ Path path = fileSystem.getPath(rootDir, currDir.substring(1));
+ fileObj = new JimfsFtpFile(currDir, path, user);
}
-
- /**
- * Get the file object.
- */
- public FtpFile getFile(String file) {
- String physicalName = getPhysicalName(file);
- Path filePath = fileSystem.getPath(physicalName);
-
- // strip the root directory and return
- String userFileName = physicalName.substring(rootDir.length() - 1);
- return new JimfsFtpFile(userFileName, filePath, user);
+ return fileObj;
+ }
+
+ /**
+ * Get the file object.
+ */
+ public FtpFile getFile(String file) {
+ String physicalName = getPhysicalName(file);
+ Path filePath = fileSystem.getPath(physicalName);
+
+ // strip the root directory and return
+ String userFileName = physicalName.substring(rootDir.length() - 1);
+ return new JimfsFtpFile(userFileName, filePath, user);
+ }
+
+ /**
+ * Change directory.
+ */
+ public boolean changeWorkingDirectory(String dir) throws FtpException {
+
+ // not a directory - return false
+ dir = getPhysicalName(dir);
+ Path dirPath = fileSystem.getPath(dir);
+ if (!Files.isDirectory(dirPath)) {
+ return false;
}
- /**
- * Change directory.
- */
- public boolean changeWorkingDirectory(String dir) throws FtpException {
-
- // not a directory - return false
- dir = getPhysicalName(dir);
- Path dirPath = fileSystem.getPath(dir);
- if (!Files.isDirectory(dirPath)) {
- return false;
- }
-
- // strip user root and add last '/' if necessary
- dir = dir.substring(rootDir.length() - 1);
- if (dir.charAt(dir.length() - 1) != '/') {
- dir = dir + '/';
- }
-
- currDir = dir;
- return true;
+ // strip user root and add last '/' if necessary
+ dir = dir.substring(rootDir.length() - 1);
+ if (dir.charAt(dir.length() - 1) != '/') {
+ dir = dir + '/';
}
- /**
- * Is the file content random accessible?
- */
- public boolean isRandomAccessible() {
- return true;
+ currDir = dir;
+ return true;
+ }
+
+ /**
+ * Is the file content random accessible?
+ */
+ public boolean isRandomAccessible() {
+ return true;
+ }
+
+ /**
+ * Dispose the file system.
+ */
+ public void dispose() {
+ // Nothing to do
+ }
+
+ private String normalizeSeparateChar(final String pathName) {
+ String normalizePathName = pathName.replace(fileSystem.getSeparator(),
"/");
+ return normalizePathName.replace('\\', '/');
+ }
+
+ private String getPhysicalName(final String file) {
+
+ // get the starting directory
+ String normalizedRootDir = normalizeSeparateChar(rootDir);
+ if (normalizedRootDir.charAt(normalizedRootDir.length() - 1) != '/') {
+ normalizedRootDir += '/';
}
- /**
- * Dispose the file system.
- */
- public void dispose() {
- // Nothing to do
+ String normalizedFileName = normalizeSeparateChar(file);
+ String resArg;
+ String normalizedCurrDir = currDir;
+ if (normalizedFileName.charAt(0) != '/') {
+ if (normalizedCurrDir == null || normalizedCurrDir.length() == 0) {
+ normalizedCurrDir = "/";
+ }
+
+ normalizedCurrDir = normalizeSeparateChar(normalizedCurrDir);
+
+ if (normalizedCurrDir.charAt(0) != '/') {
+ normalizedCurrDir = '/' + normalizedCurrDir;
+ }
+ if (normalizedCurrDir.charAt(normalizedCurrDir.length() - 1) != '/') {
+ normalizedCurrDir += '/';
+ }
+
+ resArg = normalizedRootDir + normalizedCurrDir.substring(1);
+ } else {
+ resArg = normalizedRootDir;
}
- private String normalizeSeparateChar(final String pathName) {
- String normalizePathName = pathName.replace(fileSystem.getSeparator(),
"/");
- return normalizePathName.replace('\\', '/');
+ // strip last '/'
+ if (resArg.charAt(resArg.length() - 1) == '/') {
+ resArg = resArg.substring(0, resArg.length() - 1);
}
- private String getPhysicalName(final String file) {
-
- // get the starting directory
- String normalizedRootDir = normalizeSeparateChar(rootDir);
- if (normalizedRootDir.charAt(normalizedRootDir.length() - 1) != '/') {
- normalizedRootDir += '/';
+ // replace ., ~ and ..
+ // in this loop resArg will never end with '/'
+ StringTokenizer st = new StringTokenizer(normalizedFileName, "/");
+ while (st.hasMoreTokens()) {
+ String tok = st.nextToken();
+
+ // . => current directory
+ if (tok.equals(".")) {
+ continue;
+ }
+
+ // .. => parent directory (if not root)
+ if (tok.equals("src/main")) {
+ if (resArg.startsWith(normalizedRootDir)) {
+ int slashIndex = resArg.lastIndexOf("/");
+ if (slashIndex != -1) {
+ resArg = resArg.substring(0, slashIndex);
+ }
}
-
- String normalizedFileName = normalizeSeparateChar(file);
- String resArg;
- String normalizedCurrDir = currDir;
- if (normalizedFileName.charAt(0) != '/') {
- if (normalizedCurrDir == null || normalizedCurrDir.length() == 0) {
- normalizedCurrDir = "/";
- }
-
- normalizedCurrDir = normalizeSeparateChar(normalizedCurrDir);
-
- if (normalizedCurrDir.charAt(0) != '/') {
- normalizedCurrDir = '/' + normalizedCurrDir;
- }
- if (normalizedCurrDir.charAt(normalizedCurrDir.length() - 1) !=
'/') {
- normalizedCurrDir += '/';
- }
-
- resArg = normalizedRootDir + normalizedCurrDir.substring(1);
- } else {
- resArg = normalizedRootDir;
+ continue;
+ }
+
+ // ~ => home directory (in this case is the root directory)
+ if (tok.equals("~")) {
+ resArg = normalizedRootDir.substring(0, normalizedRootDir.length() -
1);
+ continue;
+ }
+
+ if (caseInsensitive) {
+ Path dir = fileSystem.getPath(resArg);
+ DirectoryStream<Path> dirStream = null;
+ try {
+ dirStream = Files.newDirectoryStream(dir, new
NameEqualsPathFilter(tok, true));
+ } catch (IOException t) {
+ // ignore
}
-
- // strip last '/'
- if (resArg.charAt(resArg.length() - 1) == '/') {
- resArg = resArg.substring(0, resArg.length() - 1);
+ List<Path> matches = new ArrayList<>(0);
+ if (dirStream != null) {
+ for (Path match : dirStream) {
+ matches.add(match);
+ }
}
-
- // replace ., ~ and ..
- // in this loop resArg will never end with '/'
- StringTokenizer st = new StringTokenizer(normalizedFileName, "/");
- while (st.hasMoreTokens()) {
- String tok = st.nextToken();
-
- // . => current directory
- if (tok.equals(".")) {
- continue;
- }
-
- // .. => parent directory (if not root)
- if (tok.equals("src/main")) {
- if (resArg.startsWith(normalizedRootDir)) {
- int slashIndex = resArg.lastIndexOf("/");
- if (slashIndex != -1) {
- resArg = resArg.substring(0, slashIndex);
- }
- }
- continue;
- }
-
- // ~ => home directory (in this case is the root directory)
- if (tok.equals("~")) {
- resArg = normalizedRootDir.substring(0,
normalizedRootDir.length() - 1);
- continue;
- }
-
- if (caseInsensitive) {
- Path dir = fileSystem.getPath(resArg);
- DirectoryStream<Path> dirStream = null;
- try {
- dirStream = Files.newDirectoryStream(dir, new
NameEqualsPathFilter(tok, true));
- } catch (IOException t) {
- // ignore
- }
- List<Path> matches = new ArrayList<>(0);
- if (dirStream != null) {
- for (Path match : dirStream) {
- matches.add(match);
- }
- }
- if (matches.size() > 0) {
- tok = matches.get(0).getFileName().toString();
- }
- }
-
- resArg = resArg + '/' + tok;
+ if (matches.size() > 0) {
+ tok = matches.get(0).getFileName().toString();
}
+ }
- // add last slash if necessary
- if ((resArg.length()) + 1 == normalizedRootDir.length()) {
- resArg += '/';
- }
+ resArg = resArg + '/' + tok;
+ }
- // final check
- if (!resArg.regionMatches(0, normalizedRootDir, 0, normalizedRootDir
- .length())) {
- resArg = normalizedRootDir;
- }
+ // add last slash if necessary
+ if ((resArg.length()) + 1 == normalizedRootDir.length()) {
+ resArg += '/';
+ }
- return resArg;
+ // final check
+ if (!resArg.regionMatches(0, normalizedRootDir, 0, normalizedRootDir
+ .length())) {
+ resArg = normalizedRootDir;
}
+
+ return resArg;
+ }
}
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/NameEqualsPathFilter.java
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/NameEqualsPathFilter.java
index 063a26b..e9d7a69 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/NameEqualsPathFilter.java
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/NameEqualsPathFilter.java
@@ -14,8 +14,7 @@ public class NameEqualsPathFilter implements
DirectoryStream.Filter<Path> {
private boolean caseInsensitive = false;
- public NameEqualsPathFilter(final String nameToMatch,
- final boolean caseInsensitive) {
+ public NameEqualsPathFilter(final String nameToMatch, final boolean
caseInsensitive) {
this.nameToMatch = nameToMatch;
this.caseInsensitive = caseInsensitive;
}
@@ -23,7 +22,7 @@ public class NameEqualsPathFilter implements
DirectoryStream.Filter<Path> {
@Override
public boolean accept(Path entry) throws IOException {
if (caseInsensitive) {
- return entry.getFileName().toString().equalsIgnoreCase(nameToMatch);
+ return entry.getFileName().toString().equalsIgnoreCase(nameToMatch);
} else {
return entry.getFileName().toString().equals(nameToMatch);
}
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/samples/javadsl/Main.java
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/samples/javadsl/Main.java
index 86e11f9..9957c21 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/samples/javadsl/Main.java
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/samples/javadsl/Main.java
@@ -1,130 +1,128 @@
package samples.javadsl;
// #imports
-import java.net.InetAddress;
+
+import org.apache.mina.util.AvailablePortFinder;
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.japi.function.Creator;
+import org.apache.pekko.japi.function.Function;
+import org.apache.pekko.stream.IOResult;
+import org.apache.pekko.stream.connectors.file.javadsl.Directory;
+import org.apache.pekko.stream.connectors.file.javadsl.LogRotatorSink;
+import org.apache.pekko.stream.connectors.ftp.FtpCredentials;
+import org.apache.pekko.stream.connectors.ftp.KeyFileSftpIdentity;
+import org.apache.pekko.stream.connectors.ftp.SftpIdentity;
+import org.apache.pekko.stream.connectors.ftp.SftpSettings;
+import org.apache.pekko.stream.connectors.ftp.javadsl.Sftp;
+import org.apache.pekko.stream.javadsl.Compression;
+import org.apache.pekko.stream.javadsl.Flow;
+import org.apache.pekko.stream.javadsl.Keep;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.util.ByteString;
+import playground.SftpServerEmbedded;
+import playground.filesystem.FileSystemMock;
+
import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.FileSystem;
-
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
+// #imports
-import akka.Done;
-import akka.actor.typed.ActorSystem;
-import akka.actor.typed.javadsl.Behaviors;
-import akka.japi.Pair;
-import akka.japi.function.Creator;
-import akka.japi.function.Function;
-import akka.stream.IOResult;
-import akka.stream.alpakka.file.javadsl.Directory;
-import akka.stream.alpakka.file.javadsl.LogRotatorSink;
-import akka.stream.alpakka.ftp.javadsl.Sftp;
-
-import akka.stream.alpakka.ftp.FtpCredentials;
-import akka.stream.alpakka.ftp.SftpIdentity;
-import akka.stream.alpakka.ftp.KeyFileSftpIdentity;
-import akka.stream.alpakka.ftp.SftpSettings;
-import akka.stream.javadsl.Compression;
-import akka.stream.javadsl.Flow;
-import akka.stream.javadsl.Keep;
-import akka.stream.javadsl.Source;
-import akka.util.ByteString;
+public class Main {
-import org.apache.mina.util.AvailablePortFinder;
-import playground.filesystem.FileSystemMock;
-import playground.SftpServerEmbedded;
+ private void run() throws IOException {
+ final ActorSystem<Void> actorSystem =
ActorSystem.create(Behaviors.empty(), "RotateLogsToFtp");
-import akka.stream.javadsl.Sink;
-// #imports
+ final FileSystem ftpFileSystem = new FileSystemMock().fileSystem;
-public class Main {
+ final String privateKeyPassphrase = new
String(SftpServerEmbedded.clientPrivateKeyPassphrase());
+ final String pathToIdentityFile =
SftpServerEmbedded.clientPrivateKeyFile();
+ final String username = "username";
+ final String password = username;
+ final String hostname = "localhost";
- private void run() throws IOException {
- final ActorSystem<Void> actorSystem =
ActorSystem.create(Behaviors.empty(), "RotateLogsToFtp");
-
- final FileSystem ftpFileSystem = new FileSystemMock().fileSystem;
-
- final String privateKeyPassphrase = new
String(SftpServerEmbedded.clientPrivateKeyPassphrase());
- final String pathToIdentityFile =
SftpServerEmbedded.clientPrivateKeyFile();
- final String username = "username";
- final String password = username;
- final String hostname = "localhost";
-
- int port = AvailablePortFinder.getNextAvailable(21_000);
-
- Path home =
ftpFileSystem.getPath(SftpServerEmbedded.FtpRootDir()).resolve("tmp");
- if (!Files.exists(home)) Files.createDirectories(home);
-
- SftpServerEmbedded.start(ftpFileSystem, port);
-
- // #sample
- Iterator<ByteString> data =
- Arrays.asList('a', 'b', 'c', 'd').stream()
- .map(
- e -> {
- char[] arr = new char[100];
- Arrays.fill(arr, e);
- return
ByteString.fromString(String.valueOf(arr));
- })
- .iterator();
-
- // (2)
- Creator<Function<ByteString, Optional<String>>> rotator =
- () -> {
- final char[] last = {' '};
- return (bs) -> {
- char c = (char) bs.head();
- if (c != last[0]) {
- last[0] = c;
- return Optional.of("log-" + c + ".z");
- } else {
- return Optional.empty();
- }
- };
- };
-
- // (3)
- KeyFileSftpIdentity identity =
- SftpIdentity.createFileSftpIdentity(pathToIdentityFile,
privateKeyPassphrase.getBytes());
- SftpSettings settings =
- SftpSettings.create(InetAddress.getByName(hostname))
- .withPort(port)
- .withSftpIdentity(identity)
- .withStrictHostKeyChecking(false)
- .withCredentials(FtpCredentials.create(username,
password));
-
- Function<String, Sink<ByteString, CompletionStage<IOResult>>> sink =
- path -> {
- Sink<ByteString, CompletionStage<IOResult>> ftpSink =
Sftp.toPath("tmp/" + path, settings);
- return Flow.<ByteString>create()
- .via(Compression.gzip()) // (4)
- .toMat(ftpSink, Keep.right());
- };
-
- CompletionStage<Done> completion =
- Source.fromIterator(() -> data)
- .runWith(LogRotatorSink.withSinkFactory(rotator,
sink), actorSystem);
- // #sample
-
- completion
- .thenApply(
- (i) ->
- Directory.ls(home)
- .runForeach((f) ->
System.out.println(f.toString()), actorSystem))
- .whenComplete(
- (res, ex) -> {
- if (ex != null) {
- ex.printStackTrace();
- }
- actorSystem.terminate();
- actorSystem.getWhenTerminated().thenAccept(t ->
SftpServerEmbedded.stopServer());
- });
- }
+ int port = AvailablePortFinder.getNextAvailable(21_000);
- public static void main(String[] args) throws IOException {
- new Main().run();
+ Path home =
ftpFileSystem.getPath(SftpServerEmbedded.FtpRootDir()).resolve("tmp");
+ if (!Files.exists(home)) {
+ Files.createDirectories(home);
}
+
+ SftpServerEmbedded.start(ftpFileSystem, port);
+
+ // #sample
+ Iterator<ByteString> data =
+ Arrays.asList('a', 'b', 'c', 'd').stream()
+ .map(
+ e -> {
+ char[] arr = new char[100];
+ Arrays.fill(arr, e);
+ return ByteString.fromString(String.valueOf(arr));
+ })
+ .iterator();
+
+ // (2)
+ Creator<Function<ByteString, Optional<String>>> rotator =
+ () -> {
+ final char[] last = {' '};
+ return (bs) -> {
+ char c = (char) bs.head();
+ if (c != last[0]) {
+ last[0] = c;
+ return Optional.of("log-" + c + ".z");
+ } else {
+ return Optional.empty();
+ }
+ };
+ };
+
+ // (3)
+ KeyFileSftpIdentity identity =
+ SftpIdentity.createFileSftpIdentity(pathToIdentityFile,
privateKeyPassphrase.getBytes());
+ SftpSettings settings =
+ SftpSettings.create(InetAddress.getByName(hostname))
+ .withPort(port)
+ .withSftpIdentity(identity)
+ .withStrictHostKeyChecking(false)
+ .withCredentials(FtpCredentials.create(username, password));
+
+ Function<String, Sink<ByteString, CompletionStage<IOResult>>> sink =
+ path -> {
+ Sink<ByteString, CompletionStage<IOResult>> ftpSink =
Sftp.toPath("tmp/" + path, settings);
+ return Flow.<ByteString>create()
+ .via(Compression.gzip()) // (4)
+ .toMat(ftpSink, Keep.right());
+ };
+
+ CompletionStage<Done> completion =
+ Source.fromIterator(() -> data)
+ .runWith(LogRotatorSink.withSinkFactory(rotator, sink),
actorSystem);
+ // #sample
+
+ completion
+ .thenApply(
+ (i) ->
+ Directory.ls(home)
+ .runForeach((f) -> System.out.println(f.toString()),
actorSystem))
+ .whenComplete(
+ (res, ex) -> {
+ if (ex != null) {
+ ex.printStackTrace();
+ }
+ actorSystem.terminate();
+ actorSystem.getWhenTerminated().thenAccept(t ->
SftpServerEmbedded.stopServer());
+ });
+ }
+
+ public static void main(String[] args) throws IOException {
+ new Main().run();
+ }
}
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/application.conf
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/application.conf
index 7bdde8d..77117b1 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/application.conf
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/application.conf
@@ -1,5 +1,5 @@
-akka {
- loggers = ["akka.event.slf4j.Slf4jLogger"]
- logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
loglevel = "DEBUG"
}
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/logback.xml
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/logback.xml
index 1830a63..566fda5 100644
--- a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/logback.xml
+++ b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/logback.xml
@@ -7,9 +7,9 @@
</encoder>
</appender>
- <logger name="akka" level="INFO"/>
- <logger name="akka.stream.alpakka.file" level="INFO"/>
- <logger name="akka.stream.alpakka.ftp" level="INFO"/>
+ <logger name="org.apache.pekko" level="INFO"/>
+ <logger name="org.apache.pekko.stream.connectors.file" level="INFO"/>
+ <logger name="org.apache.pekko.stream.connectors.ftp" level="INFO"/>
<logger name="org.apache.sshd" level="INFO"/>
<logger name="net.schmizz" level="INFO"/>
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/playground/SftpServerEmbedded.scala
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/playground/SftpServerEmbedded.scala
index e0827e9..ce2dba9 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/playground/SftpServerEmbedded.scala
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/playground/SftpServerEmbedded.scala
@@ -6,7 +6,7 @@ package playground
import java.io.File
import java.nio.charset.Charset
-import java.nio.file.{FileSystem, Files, Path, Paths}
+import java.nio.file.{ FileSystem, Files, Path, Paths }
import java.security.PublicKey
import java.util
diff --git
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/samples/scaladsl/Main.scala
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/samples/scaladsl/Main.scala
index b4319bc..4e3fd85 100644
---
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/samples/scaladsl/Main.scala
+++
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/samples/scaladsl/Main.scala
@@ -2,22 +2,21 @@ package samples.scaladsl
// #imports
-import java.net.InetAddress
-import java.nio.file.{Files, Path}
-
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.scaladsl.Behaviors
-import akka.stream.alpakka.file.scaladsl.{Directory, LogRotatorSink}
-import akka.stream.alpakka.ftp.scaladsl.Sftp
-import akka.stream.alpakka.ftp.{FtpCredentials, SftpIdentity, SftpSettings}
-import akka.stream.scaladsl.{Compression, Flow, Keep, Source}
-import akka.util.ByteString
import org.apache.mina.util.AvailablePortFinder
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.stream.connectors.file.scaladsl.{ Directory,
LogRotatorSink }
+import org.apache.pekko.stream.connectors.ftp.{ FtpCredentials, SftpIdentity,
SftpSettings }
+import org.apache.pekko.stream.connectors.ftp.scaladsl.Sftp
+import org.apache.pekko.stream.scaladsl.{ Compression, Flow, Keep, Source }
+import org.apache.pekko.util.ByteString
import playground.SftpServerEmbedded
import playground.filesystem.FileSystemMock
+import java.net.InetAddress
+import java.nio.file.{ Files, Path }
import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.ExecutionContext
// #imports
object Main extends App {
@@ -26,7 +25,6 @@ object Main extends App {
def wait(duration: FiniteDuration): Unit = Thread.sleep(duration.toMillis)
-
private val ftpFileSystem = new FileSystemMock().fileSystem
private val privateKeyPassphrase =
SftpServerEmbedded.clientPrivateKeyPassphrase
private val pathToIdentityFile = SftpServerEmbedded.clientPrivateKeyFile
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]