This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-samples.git


The following commit(s) were added to refs/heads/main by this push:
     new 613b1a6  [rotate-logs-to-ftp] Migrate akka dependencies to pekko (#21)
613b1a6 is described below

commit 613b1a6414bc711f568189b15a6b095b4c734fb8
Author: Laglangyue <[email protected]>
AuthorDate: Mon Dec 18 00:06:57 2023 +0800

    [rotate-logs-to-ftp] Migrate akka dependencies to pekko (#21)
    
    * [rotate-logs-to-ftp] Migrate akka dependencies to pekko
    
    * Update application.conf
    
    ---------
    
    Co-authored-by: laglang <[email protected]>
    Co-authored-by: PJ Fanning <[email protected]>
---
 .../build.sbt                                      |   2 +-
 .../project/Dependencies.scala                     |  21 +-
 .../java/playground/filesystem/FileSystemMock.java | 136 +++---
 .../java/playground/filesystem/JimfsFactory.java   |  82 ++--
 .../playground/filesystem/impl/JimfsFtpFile.java   | 530 ++++++++++-----------
 .../java/playground/filesystem/impl/JimfsView.java | 376 ++++++++-------
 .../filesystem/impl/NameEqualsPathFilter.java      |   5 +-
 .../src/main/java/samples/javadsl/Main.java        | 222 +++++----
 .../src/main/resources/application.conf            |   6 +-
 .../src/main/resources/logback.xml                 |   6 +-
 .../main/scala/playground/SftpServerEmbedded.scala |   2 +-
 .../src/main/scala/samples/scaladsl/Main.scala     |  22 +-
 12 files changed, 701 insertions(+), 709 deletions(-)

diff --git a/pekko-connectors-sample-rotate-logs-to-ftp/build.sbt 
b/pekko-connectors-sample-rotate-logs-to-ftp/build.sbt
index 47329e0..6148cd2 100644
--- a/pekko-connectors-sample-rotate-logs-to-ftp/build.sbt
+++ b/pekko-connectors-sample-rotate-logs-to-ftp/build.sbt
@@ -1,6 +1,6 @@
 organization := "org.apache.pekko"
 name := "pekko-connectors-sample-rotate-logs-to-ftp"
-version := "1.0.0"
+version := "1.0.1"
 scalaVersion := Dependencies.scalaVer
 libraryDependencies ++= Dependencies.dependencies
 javacOptions += "-Xlint:unchecked"
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/project/Dependencies.scala 
b/pekko-connectors-sample-rotate-logs-to-ftp/project/Dependencies.scala
index 2829787..8bd8882 100644
--- a/pekko-connectors-sample-rotate-logs-to-ftp/project/Dependencies.scala
+++ b/pekko-connectors-sample-rotate-logs-to-ftp/project/Dependencies.scala
@@ -3,18 +3,18 @@ import sbt._
 object Dependencies {
   val scalaVer = "2.13.12"
   // #deps
-  val PekkoVersion = "2.6.19"
-  val PekkoConnectorsVersion = "4.0.0"
+  val PekkoVersion = "1.0.2"
+  val PekkoConnectorsVersion = "1.0.1"
 
   // #deps
 
   val dependencies = List(
-  // #deps
-    "com.typesafe.akka" %% "akka-stream" % PekkoVersion,
-    "com.typesafe.akka" %% "akka-actor-typed" % PekkoVersion,
-    "com.typesafe.akka" %% "akka-actor" % PekkoVersion,
-    "com.lightbend.akka" %% "akka-stream-alpakka-file" % 
PekkoConnectorsVersion,
-    "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % PekkoConnectorsVersion,
+    // #deps
+    "org.apache.pekko" %% "pekko-stream" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-actor" % PekkoVersion,
+    "org.apache.pekko" %% "pekko-connectors-file" % PekkoConnectorsVersion,
+    "org.apache.pekko" %% "pekko-connectors-ftp" % PekkoConnectorsVersion,
     // #deps
     // Playground file system and FTP server
     // https://mina.apache.org/ftpserver-project/downloads.html
@@ -23,7 +23,6 @@ object Dependencies {
     "org.apache.sshd" % "sshd-sftp" % "2.5.1", // ApacheV2
     "com.google.jimfs" % "jimfs" % "1.1", // ApacheV2
     // Logging
-    "com.typesafe.akka" %% "akka-slf4j" % PekkoVersion,
-    "ch.qos.logback" % "logback-classic" % "1.2.13"
-  )
+    "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion,
+    "ch.qos.logback" % "logback-classic" % "1.2.13")
 }
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/FileSystemMock.java
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/FileSystemMock.java
index b0a59df..4d3690a 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/FileSystemMock.java
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/FileSystemMock.java
@@ -13,85 +13,85 @@ import java.nio.file.attribute.BasicFileAttributes;
 
 public class FileSystemMock {
 
-    public final FileSystem fileSystem;
+  public final FileSystem fileSystem;
 
-    private String loremIpsum =
-        "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Praesent 
auctor imperdiet " +
-        "velit, eu dapibus nisl dapibus vitae. Sed quam lacus, fringilla 
posuere ligula at, " +
-        "aliquet laoreet nulla. Aliquam id fermentum justo. Aliquam et massa 
consequat, " +
-        "pellentesque dolor nec, gravida libero. Phasellus elit eros, finibus 
eget " +
-        "sollicitudin ac, consectetur sed ante. Etiam ornare lacus blandit 
nisi gravida " +
-        "accumsan. Sed in lorem arcu. Vivamus et eleifend ligula. Maecenas ut 
commodo ante. " +
-        "Suspendisse sit amet placerat arcu, porttitor sagittis velit. Quisque 
gravida mi a " +
-        "porttitor ornare. Cras lorem nisl, sollicitudin vitae odio at, 
vehicula maximus " +
-        "mauris. Sed ac purus ac turpis pellentesque cursus ac eget est. 
Pellentesque " +
-        "habitant morbi tristique senectus et netus et malesuada fames ac 
turpis egestas.";
+  private String loremIpsum =
+      "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Praesent 
auctor imperdiet " +
+          "velit, eu dapibus nisl dapibus vitae. Sed quam lacus, fringilla 
posuere ligula at, " +
+          "aliquet laoreet nulla. Aliquam id fermentum justo. Aliquam et massa 
consequat, " +
+          "pellentesque dolor nec, gravida libero. Phasellus elit eros, 
finibus eget " +
+          "sollicitudin ac, consectetur sed ante. Etiam ornare lacus blandit 
nisi gravida " +
+          "accumsan. Sed in lorem arcu. Vivamus et eleifend ligula. Maecenas 
ut commodo ante. " +
+          "Suspendisse sit amet placerat arcu, porttitor sagittis velit. 
Quisque gravida mi a " +
+          "porttitor ornare. Cras lorem nisl, sollicitudin vitae odio at, 
vehicula maximus " +
+          "mauris. Sed ac purus ac turpis pellentesque cursus ac eget est. 
Pellentesque " +
+          "habitant morbi tristique senectus et netus et malesuada fames ac 
turpis egestas.";
 
-    public FileSystemMock(FileSystem fs) {
-        fileSystem = fs;
-    }
+  public FileSystemMock(FileSystem fs) {
+    fileSystem = fs;
+  }
 
-    public FileSystemMock() {
-        this(Jimfs.newFileSystem(Configuration.unix()));
-    }
+  public FileSystemMock() {
+    this(Jimfs.newFileSystem(Configuration.unix()));
+  }
 
-    public void cleanFiles() {
-        for (Path rootDir : getFileSystem().getRootDirectories()) {
-            try {
-                Files.walkFileTree(rootDir, new SimpleFileVisitor<Path>() {
-                    @Override
-                    public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IOException {
-                        Files.deleteIfExists(file);
-                        return FileVisitResult.CONTINUE;
-                    }
-                });
-            } catch (Throwable t) {
-                t.printStackTrace();
-            }
-        }
+  public void cleanFiles() {
+    for (Path rootDir : getFileSystem().getRootDirectories()) {
+      try {
+        Files.walkFileTree(rootDir, new SimpleFileVisitor<Path>() {
+          @Override
+          public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) throws IOException {
+            Files.deleteIfExists(file);
+            return FileVisitResult.CONTINUE;
+          }
+        });
+      } catch (Throwable t) {
+        t.printStackTrace();
+      }
     }
+  }
 
-    public void generateFiles(int numFiles, int pageSize, String basePath) {
-        String base = "";
-        if (!basePath.isEmpty()) {
-            if ('/' != basePath.charAt(0)) {
-                base = "/" + basePath;
-            } else {
-                base = basePath;
-            }
-        }
-        int i = 1;
-        while (i <= numFiles) {
-            int j = i / pageSize;
-            String subDir = (j > 0) ? "/dir_" + j : "";
-            putFileOnFtp(base + subDir, "sample_" + i);
-            i++;
-        }
+  public void generateFiles(int numFiles, int pageSize, String basePath) {
+    String base = "";
+    if (!basePath.isEmpty()) {
+      if ('/' != basePath.charAt(0)) {
+        base = "/" + basePath;
+      } else {
+        base = basePath;
+      }
     }
-
-    public void putFileOnFtp(String path, String fileName) {
-        putFileOnFtpWithContents(path, fileName, loremIpsum.getBytes());
+    int i = 1;
+    while (i <= numFiles) {
+      int j = i / pageSize;
+      String subDir = (j > 0) ? "/dir_" + j : "";
+      putFileOnFtp(base + subDir, "sample_" + i);
+      i++;
     }
+  }
 
-    public void putFileOnFtpWithContents(String path, String fileName, byte[] 
fileContents) {
-        try {
-            Path baseDir = getFileSystem().getPath(path);
-            if (!Files.exists(baseDir)) {
-                Files.createDirectories(baseDir);
-            }
-            Path filePath = baseDir.resolve(fileName);
-            Files.write(filePath, fileContents);
-        } catch (Throwable t) {
-            throw new RuntimeException(t);
-        }
-    }
+  public void putFileOnFtp(String path, String fileName) {
+    putFileOnFtpWithContents(path, fileName, loremIpsum.getBytes());
+  }
 
-    FileSystem getFileSystem() {
-        return fileSystem;
+  public void putFileOnFtpWithContents(String path, String fileName, byte[] 
fileContents) {
+    try {
+      Path baseDir = getFileSystem().getPath(path);
+      if (!Files.exists(baseDir)) {
+        Files.createDirectories(baseDir);
+      }
+      Path filePath = baseDir.resolve(fileName);
+      Files.write(filePath, fileContents);
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
     }
+  }
 
-    public String getLoremIpsum() {
-        return loremIpsum;
-    }
+  FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  public String getLoremIpsum() {
+    return loremIpsum;
+  }
 
 }
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/JimfsFactory.java
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/JimfsFactory.java
index 5f7e87f..455a670 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/JimfsFactory.java
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/JimfsFactory.java
@@ -19,56 +19,56 @@ import java.nio.file.Path;
 
 public class JimfsFactory implements FileSystemFactory {
 
-    private final Logger LOG = LoggerFactory.getLogger(JimfsFactory.class);
+  private final Logger LOG = LoggerFactory.getLogger(JimfsFactory.class);
 
-    private FileSystem fileSystem;
+  private FileSystem fileSystem;
 
-    private boolean createHome;
+  private boolean createHome;
 
-    private boolean caseInsensitive;
+  private boolean caseInsensitive;
 
-    public boolean isCreateHome() {
-        return createHome;
-    }
+  public boolean isCreateHome() {
+    return createHome;
+  }
 
-    public void setCreateHome(boolean createHome) {
-        this.createHome = createHome;
-    }
+  public void setCreateHome(boolean createHome) {
+    this.createHome = createHome;
+  }
 
-    public boolean isCaseInsensitive() {
-        return caseInsensitive;
-    }
+  public boolean isCaseInsensitive() {
+    return caseInsensitive;
+  }
 
-    public void setCaseInsensitive(boolean caseInsensitive) {
-        this.caseInsensitive = caseInsensitive;
-    }
+  public void setCaseInsensitive(boolean caseInsensitive) {
+    this.caseInsensitive = caseInsensitive;
+  }
 
-    public JimfsFactory(FileSystem fileSystem) {
-        this.fileSystem = fileSystem;
-    }
+  public JimfsFactory(FileSystem fileSystem) {
+    this.fileSystem = fileSystem;
+  }
 
-    @Override
-    public FileSystemView createFileSystemView(User user) throws FtpException {
-        synchronized (user) {
-            // create home if does not exist
-            if (createHome) {
-                String homeDirStr = user.getHomeDirectory();
-                Path homeDir = fileSystem.getPath(homeDirStr);
-                if (Files.isRegularFile(homeDir)) {
-                    LOG.warn("Not a directory :: " + homeDirStr);
-                    throw new FtpException("Not a directory :: " + homeDirStr);
-                }
-                if (!Files.exists(homeDir)) {
-                    try {
-                        Files.createDirectories(homeDir);
-                    } catch (IOException t) {
-                        final String msg = "Cannot create user home :: " + 
homeDirStr;
-                        LOG.warn(msg);
-                        throw new FtpException(msg, t);
-                    }
-                }
-            }
-            return new JimfsView(fileSystem, user, caseInsensitive);
+  @Override
+  public FileSystemView createFileSystemView(User user) throws FtpException {
+    synchronized (user) {
+      // create home if not exist
+      if (createHome) {
+        String homeDirStr = user.getHomeDirectory();
+        Path homeDir = fileSystem.getPath(homeDirStr);
+        if (Files.isRegularFile(homeDir)) {
+          LOG.warn("Not a directory :: " + homeDirStr);
+          throw new FtpException("Not a directory :: " + homeDirStr);
+        }
+        if (!Files.exists(homeDir)) {
+          try {
+            Files.createDirectories(homeDir);
+          } catch (IOException t) {
+            final String msg = "Cannot create user home :: " + homeDirStr;
+            LOG.warn(msg);
+            throw new FtpException(msg, t);
+          }
         }
+      }
+      return new JimfsView(fileSystem, user, caseInsensitive);
     }
+  }
 }
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsFtpFile.java
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsFtpFile.java
index c51f644..be0009f 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsFtpFile.java
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsFtpFile.java
@@ -20,325 +20,325 @@ import java.util.List;
 
 public class JimfsFtpFile implements FtpFile {
 
-    private final Logger LOG = LoggerFactory.getLogger(JimfsFtpFile.class);
+  private final Logger LOG = LoggerFactory.getLogger(JimfsFtpFile.class);
 
-    // the file name with respect to the user root (so it's the virtual 
filename).
-    // The path separator character will be '/' and
-    // it will always begin with '/'.
-    private String fileName;
+  // the file name with respect to the user root (so it's the virtual 
filename).
+  // The path separator character will be '/' and
+  // it will always begin with '/'.
+  private String fileName;
 
-    // The `physical` path in the underlying file system (but happens that
-    // in this case, the `physical` path will be also virtusl (jimfs), so 
crazy).
-    private Path path;
+  // The `physical` path in the underlying file system (but happens that
+  // in this case, the `physical` path will be also virtusl (jimfs), so crazy).
+  private Path path;
 
-    private User user;
+  private User user;
 
-    protected JimfsFtpFile(final String fileName, final Path path,
-            final User user) {
-        if (fileName == null) {
-            throw new IllegalArgumentException("fileName can not be null");
-        }
-        if (path == null) {
-            throw new IllegalArgumentException("path can not be null");
-        }
-        if (fileName.length() == 0) {
-            throw new IllegalArgumentException("fileName can not be empty");
-        } else if (fileName.charAt(0) != '/') {
-            throw new IllegalArgumentException(
-                    "fileName must be an absolute path");
-        }
-
-        this.fileName = fileName;
-        this.path = path;
-        this.user = user;
+  protected JimfsFtpFile(final String fileName, final Path path,
+      final User user) {
+    if (fileName == null) {
+      throw new IllegalArgumentException("fileName can not be null");
     }
-
-    public String getAbsolutePath() {
-
-        // strip the last '/' if necessary
-        String fullName = fileName;
-        int filelen = fullName.length();
-        if ((filelen != 1) && (fullName.charAt(filelen - 1) == '/')) {
-            fullName = fullName.substring(0, filelen - 1);
-        }
-        return fullName;
+    if (path == null) {
+      throw new IllegalArgumentException("path can not be null");
+    }
+    if (fileName.length() == 0) {
+      throw new IllegalArgumentException("fileName can not be empty");
+    } else if (fileName.charAt(0) != '/') {
+      throw new IllegalArgumentException(
+          "fileName must be an absolute path");
     }
 
-    public String getName() {
-
-        // root - the short name will be '/'
-        if (fileName.equals("/")) {
-            return "/";
-        }
-
-        // strip the last '/'
-        String shortName = fileName;
-        int filelen = fileName.length();
-        if (shortName.charAt(filelen - 1) == '/') {
-            shortName = shortName.substring(0, filelen - 1);
-        }
+    this.fileName = fileName;
+    this.path = path;
+    this.user = user;
+  }
 
-        // return from the last '/'
-        int slashIndex = shortName.lastIndexOf('/');
-        if (slashIndex != -1) {
-            shortName = shortName.substring(slashIndex + 1);
-        }
-        return shortName;
-    }
+  public String getAbsolutePath() {
 
-    public boolean isHidden() {
-        try {
-            return Files.isHidden(path);
-        } catch (IOException t) {
-            LOG.error(t.getMessage());
-        }
-        return false;
+    // strip the last '/' if necessary
+    String fullName = fileName;
+    int filelen = fullName.length();
+    if ((filelen != 1) && (fullName.charAt(filelen - 1) == '/')) {
+      fullName = fullName.substring(0, filelen - 1);
     }
+    return fullName;
+  }
 
-    public boolean isDirectory() {
-        return Files.isDirectory(path);
-    }
+  public String getName() {
 
-    public boolean isFile() {
-        return Files.isRegularFile(path);
+    // root - the short name will be '/'
+    if (fileName.equals("/")) {
+      return "/";
     }
 
-    public boolean doesExist() {
-        return Files.exists(path);
+    // strip the last '/'
+    String shortName = fileName;
+    int filelen = fileName.length();
+    if (shortName.charAt(filelen - 1) == '/') {
+      shortName = shortName.substring(0, filelen - 1);
     }
 
-    public long getSize() {
-        try {
-            return Files.size(path);
-        } catch (IOException t) {
-            LOG.error(t.getMessage());
-        }
-        return -1;
+    // return from the last '/'
+    int slashIndex = shortName.lastIndexOf('/');
+    if (slashIndex != -1) {
+      shortName = shortName.substring(slashIndex + 1);
     }
-
-    public String getOwnerName() {
-        return "user";
+    return shortName;
+  }
+
+  public boolean isHidden() {
+    try {
+      return Files.isHidden(path);
+    } catch (IOException t) {
+      LOG.error(t.getMessage());
     }
-
-    public String getGroupName() {
-        return "group";
+    return false;
+  }
+
+  public boolean isDirectory() {
+    return Files.isDirectory(path);
+  }
+
+  public boolean isFile() {
+    return Files.isRegularFile(path);
+  }
+
+  public boolean doesExist() {
+    return Files.exists(path);
+  }
+
+  public long getSize() {
+    try {
+      return Files.size(path);
+    } catch (IOException t) {
+      LOG.error(t.getMessage());
     }
-
-    public int getLinkCount() {
-        return Files.isDirectory(path) ? 3 : 1;
+    return -1;
+  }
+
+  public String getOwnerName() {
+    return "user";
+  }
+
+  public String getGroupName() {
+    return "group";
+  }
+
+  public int getLinkCount() {
+    return Files.isDirectory(path) ? 3 : 1;
+  }
+
+  public long getLastModified() {
+    try {
+      return Files.getLastModifiedTime(path).toMillis();
+    } catch (IOException t) {
+      LOG.error(t.getMessage());
     }
-
-    public long getLastModified() {
-        try {
-            return Files.getLastModifiedTime(path).toMillis();
-        } catch (IOException t) {
-            LOG.error(t.getMessage());
-        }
-        return -1;
+    return -1;
+  }
+
+  public boolean setLastModified(long time) {
+    try {
+      Files.setLastModifiedTime(path, FileTime.fromMillis(time));
+      return true;
+    } catch (IOException t) {
+      LOG.error(t.getMessage());
     }
-
-    public boolean setLastModified(long time) {
-        try {
-            Files.setLastModifiedTime(path, FileTime.fromMillis(time));
-            return true;
-        } catch (IOException t) {
-            LOG.error(t.getMessage());
-        }
-        return false;
+    return false;
+  }
+
+  public boolean isReadable() {
+    return Files.isReadable(path);
+  }
+
+  public boolean isWritable() {
+    LOG.debug("Checking authorization for " + getAbsolutePath());
+    if (user.authorize(new WriteRequest(getAbsolutePath())) == null) {
+      LOG.debug("Not authorized");
+      return false;
     }
 
-    public boolean isReadable() {
-        return Files.isReadable(path);
+    LOG.debug("Checking if file exists");
+    if (Files.exists(path)) {
+      LOG.debug("Checking can write: " + getAbsolutePath());
+      return Files.isWritable(path);
     }
 
-    public boolean isWritable() {
-        LOG.debug("Checking authorization for " + getAbsolutePath());
-        if (user.authorize(new WriteRequest(getAbsolutePath())) == null) {
-            LOG.debug("Not authorized");
-            return false;
-        }
+    LOG.debug("Authorized");
+    return true;
+  }
 
-        LOG.debug("Checking if file exists");
-        if (Files.exists(path)) {
-            LOG.debug("Checking can write: " + getAbsolutePath());
-            return Files.isWritable(path);
-        }
+  public boolean isRemovable() {
 
-        LOG.debug("Authorized");
-        return true;
+    // root cannot be deleted
+    if ("/".equals(fileName)) {
+      return false;
     }
 
-    public boolean isRemovable() {
-
-        // root cannot be deleted
-        if ("/".equals(fileName)) {
-            return false;
-        }
-
-        String fullName = getAbsolutePath();
-
-        // we check FTPServer's write permission for this file.
-        if (user.authorize(new WriteRequest(fullName)) == null) {
-            return false;
-        }
-
-        // In order to maintain consistency, when possible we delete the last 
'/' character in the String
-        int indexOfSlash = fullName.lastIndexOf('/');
-        String parentFullName;
-        if (indexOfSlash == 0) {
-            parentFullName = "/";
-        } else {
-            parentFullName = fullName.substring(0, indexOfSlash);
-        }
+    String fullName = getAbsolutePath();
 
-        JimfsFtpFile parentObject = new JimfsFtpFile(parentFullName,
-                path.getParent(), user);
-        return parentObject.isWritable();
+    // we check FTPServer's write permission for this file.
+    if (user.authorize(new WriteRequest(fullName)) == null) {
+      return false;
     }
 
-    public boolean delete() {
-        boolean retVal = false;
-        try {
-            if (isRemovable()) {
-                Files.delete(path);
-                retVal = true;
-            }
-        } catch (IOException t) {
-            LOG.error(t.getMessage());
-        }
-        return retVal;
+    // In order to maintain consistency, when possible we delete the last '/' 
character in the String
+    int indexOfSlash = fullName.lastIndexOf('/');
+    String parentFullName;
+    if (indexOfSlash == 0) {
+      parentFullName = "/";
+    } else {
+      parentFullName = fullName.substring(0, indexOfSlash);
     }
 
-    public boolean move(final FtpFile dest) {
-        boolean retVal = false;
-        if (dest.isWritable() && isReadable()) {
-            Path destPath = ((JimfsFtpFile) dest).path;
-
-            if (Files.exists(destPath)) {
-                // renameTo behaves differently on different platforms
-                // this check verifies that if the destination already exists,
-                // we fail
-                retVal = false;
-            } else {
-                try {
-                    Files.move(path, destPath, 
StandardCopyOption.REPLACE_EXISTING);
-                    retVal = true;
-                } catch (IOException t) {
-                    LOG.error(t.getMessage());
-                }
-            }
-        }
-        return retVal;
+    JimfsFtpFile parentObject = new JimfsFtpFile(parentFullName,
+        path.getParent(), user);
+    return parentObject.isWritable();
+  }
+
+  public boolean delete() {
+    boolean retVal = false;
+    try {
+      if (isRemovable()) {
+        Files.delete(path);
+        retVal = true;
+      }
+    } catch (IOException t) {
+      LOG.error(t.getMessage());
     }
-
-    public boolean mkdir() {
-        boolean retVal = false;
+    return retVal;
+  }
+
+  public boolean move(final FtpFile dest) {
+    boolean retVal = false;
+    if (dest.isWritable() && isReadable()) {
+      Path destPath = ((JimfsFtpFile) dest).path;
+
+      if (Files.exists(destPath)) {
+        // renameTo behaves differently on different platforms
+        // this check verifies that if the destination already exists,
+        // we fail
+        retVal = false;
+      } else {
         try {
-            if (isWritable()) {
-                Files.createDirectory(path);
-                retVal = true;
-            }
+          Files.move(path, destPath, StandardCopyOption.REPLACE_EXISTING);
+          retVal = true;
         } catch (IOException t) {
-            LOG.error(t.getMessage());
+          LOG.error(t.getMessage());
         }
-        return retVal;
+      }
     }
-
-    public Path getPhysicalFile() {
-        return path;
+    return retVal;
+  }
+
+  public boolean mkdir() {
+    boolean retVal = false;
+    try {
+      if (isWritable()) {
+        Files.createDirectory(path);
+        retVal = true;
+      }
+    } catch (IOException t) {
+      LOG.error(t.getMessage());
     }
+    return retVal;
+  }
 
-    public List<FtpFile> listFiles() {
-
-        // is a directory
-        if (!Files.isDirectory(path)) {
-            return null;
-        }
-
-        // directory - return all the files
-        DirectoryStream<Path> filesStream = null;
-        try {
-            filesStream = Files.newDirectoryStream(path);
-        } catch(IOException t) {
-            LOG.error(t.getMessage());
-        }
+  public Path getPhysicalFile() {
+    return path;
+  }
 
-        if (filesStream == null) {
-            return null;
-        }
-
-        List<Path> files = new ArrayList<>();
-        for (Path path : filesStream) {
-            files.add(path);
-        }
+  public List<FtpFile> listFiles() {
 
-        // make sure the files are returned in order
-        Collections.sort(files, new Comparator<Path>() {
-            public int compare(Path o1, Path o2) {
-                return o1.getFileName().compareTo(o2.getFileName());
-            }
-        });
-
-        // get the virtual name of the base directory
-        final String virtualFileStr =
-                getAbsolutePath().charAt(getAbsolutePath().length() - 1) != '/'
-                        ? getAbsolutePath() + '/' : getAbsolutePath();
-
-        // now return all the files under the directory
-        List<FtpFile> virtualFiles = new ArrayList<>(files.size());
-        for (Path file : files) {
-            String fileName = virtualFileStr + file.getFileName();
-            virtualFiles.add(new JimfsFtpFile(fileName, file, user));
-        }
-        return virtualFiles;
+    // is a directory
+    if (!Files.isDirectory(path)) {
+      return null;
     }
 
-    public OutputStream createOutputStream(long offset)
-            throws IOException {
+    // directory - return all the files
+    DirectoryStream<Path> filesStream = null;
+    try {
+      filesStream = Files.newDirectoryStream(path);
+    } catch (IOException t) {
+      LOG.error(t.getMessage());
+    }
 
-        // permission check
-        if (!isWritable()) {
-            throw new IOException("No write permission : " + 
path.getFileName());
-        }
+    if (filesStream == null) {
+      return null;
+    }
 
-        // create output stream
-        final RandomAccessFile raf = new RandomAccessFile(path.toFile(), "rw");
-        raf.setLength(offset);
-        raf.seek(offset);
-
-        // The IBM jre needs to have both the stream and the random access file
-        // objects closed to actually close the file
-        return new FileOutputStream(raf.getFD()) {
-            @Override
-            public void close() throws IOException {
-                super.close();
-                raf.close();
-            }
-        };
+    List<Path> files = new ArrayList<>();
+    for (Path path : filesStream) {
+      files.add(path);
     }
 
-    public InputStream createInputStream(long offset)
-            throws IOException {
+    // make sure the files are returned in order
+    Collections.sort(files, new Comparator<Path>() {
+      public int compare(Path o1, Path o2) {
+        return o1.getFileName().compareTo(o2.getFileName());
+      }
+    });
+
+    // get the virtual name of the base directory
+    final String virtualFileStr =
+        getAbsolutePath().charAt(getAbsolutePath().length() - 1) != '/'
+            ? getAbsolutePath() + '/' : getAbsolutePath();
+
+    // now return all the files under the directory
+    List<FtpFile> virtualFiles = new ArrayList<>(files.size());
+    for (Path file : files) {
+      String fileName = virtualFileStr + file.getFileName();
+      virtualFiles.add(new JimfsFtpFile(fileName, file, user));
+    }
+    return virtualFiles;
+  }
 
-        // permission check
-        if (!isReadable()) {
-            throw new IOException("No read permission : " + 
path.getFileName());
-        }
+  public OutputStream createOutputStream(long offset)
+      throws IOException {
 
-        return Files.newInputStream(path, StandardOpenOption.READ);
+    // permission check
+    if (!isWritable()) {
+      throw new IOException("No write permission : " + path.getFileName());
     }
 
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof JimfsFtpFile) {
-            Path otherPath = ((JimfsFtpFile) obj).path.normalize();
-            return this.path.normalize().equals(otherPath);
-        }
-        return false;
+    // create output stream
+    final RandomAccessFile raf = new RandomAccessFile(path.toFile(), "rw");
+    raf.setLength(offset);
+    raf.seek(offset);
+
+    // The IBM jre needs to have both the stream and the random access file
+    // objects closed to actually close the file
+    return new FileOutputStream(raf.getFD()) {
+      @Override
+      public void close() throws IOException {
+        super.close();
+        raf.close();
+      }
+    };
+  }
+
+  public InputStream createInputStream(long offset)
+      throws IOException {
+
+    // permission check
+    if (!isReadable()) {
+      throw new IOException("No read permission : " + path.getFileName());
     }
 
-    @Override
-    public int hashCode() {
-        return path.normalize().hashCode();
+    return Files.newInputStream(path, StandardOpenOption.READ);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof JimfsFtpFile) {
+      Path otherPath = ((JimfsFtpFile) obj).path.normalize();
+      return this.path.normalize().equals(otherPath);
     }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return path.normalize().hashCode();
+  }
 }
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsView.java
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsView.java
index eebf6c3..a18ea46 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsView.java
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/JimfsView.java
@@ -21,231 +21,229 @@ import java.util.List;
 import java.util.StringTokenizer;
 
 /**
- * File system view based on the in-memory jimfs file system. The root in this
- * class is the user virtual root (/).
+ * File system view based on the in-memory jimfs file system. The root in this 
class is the user virtual root (/).
  */
 public class JimfsView implements FileSystemView {
 
-    private final Logger LOG = LoggerFactory.getLogger(JimfsView.class);
+  private final Logger LOG = LoggerFactory.getLogger(JimfsView.class);
 
-    // this will be the jimfs file system in runtime.
-    private FileSystem fileSystem;
+  // this will be the jimfs file system in runtime.
+  private FileSystem fileSystem;
 
-    // the root directory will always end with '/'.
-    private String rootDir;
+  // the root directory will always end with '/'.
+  private String rootDir;
 
-    // the first and the last character will always be '/'
-    // It is always with respect to the root directory.
-    private String currDir;
+  // the first and the last character will always be '/'
+  // It is always with respect to the root directory.
+  private String currDir;
 
-    private User user;
+  private User user;
 
-    private boolean caseInsensitive = false;
+  private boolean caseInsensitive = false;
 
-    public JimfsView(FileSystem fileSystem, User user, boolean caseInsensitive)
-        throws FtpException {
-        if (fileSystem == null) {
-            throw new IllegalArgumentException("filesystem can not be null");
-        }
-        if (user == null) {
-            throw new IllegalArgumentException("user can not be null");
-        }
-        if (user.getHomeDirectory() == null) {
-            throw new IllegalArgumentException("user home directory can not be 
null");
-        }
+  public JimfsView(FileSystem fileSystem, User user, boolean caseInsensitive)
+      throws FtpException {
+    if (fileSystem == null) {
+      throw new IllegalArgumentException("filesystem can not be null");
+    }
+    if (user == null) {
+      throw new IllegalArgumentException("user can not be null");
+    }
+    if (user.getHomeDirectory() == null) {
+      throw new IllegalArgumentException("user home directory can not be 
null");
+    }
 
-        this.fileSystem = fileSystem;
+    this.fileSystem = fileSystem;
 
-        this.caseInsensitive = caseInsensitive;
+    this.caseInsensitive = caseInsensitive;
 
-        // add last '/' if necessary
-        String rootDir = user.getHomeDirectory();
-        rootDir = normalizeSeparateChar(rootDir);
-        if (!rootDir.endsWith("/")) {
-            rootDir += '/';
-        }
+    // add last '/' if necessary
+    String rootDir = user.getHomeDirectory();
+    rootDir = normalizeSeparateChar(rootDir);
+    if (!rootDir.endsWith("/")) {
+      rootDir += '/';
+    }
 
-        LOG.debug("Jimfs filesystem view created by user \"{}\" with root 
\"{}\"", user.getName(), rootDir);
+    LOG.debug("Jimfs filesystem view created by user \"{}\" with root \"{}\"", 
user.getName(), rootDir);
 
-        this.rootDir = rootDir;
+    this.rootDir = rootDir;
 
-        this.user = user;
+    this.user = user;
 
-        currDir = "/";
+    currDir = "/";
 
-    }
+  }
 
-    /**
-     * Get the user home directory. It would be the file system root
-     * for the specific user.
-     */
-    public FtpFile getHomeDirectory() throws FtpException {
-        return new JimfsFtpFile("/", fileSystem.getPath(rootDir), user);
-    }
+  /**
+   * Get the user home directory. It would be the file system root for the 
specific user.
+   */
+  public FtpFile getHomeDirectory() throws FtpException {
+    return new JimfsFtpFile("/", fileSystem.getPath(rootDir), user);
+  }
 
-    /**
-     * Get the current directory.
-     */
-    public FtpFile getWorkingDirectory() throws FtpException {
-        FtpFile fileObj;
-        if (currDir.equals("/")) {
-            fileObj = getHomeDirectory();
-        } else {
-            Path path = fileSystem.getPath(rootDir, currDir.substring(1));
-            fileObj = new JimfsFtpFile(currDir, path, user);
-        }
-        return fileObj;
+  /**
+   * Get the current directory.
+   */
+  public FtpFile getWorkingDirectory() throws FtpException {
+    FtpFile fileObj;
+    if (currDir.equals("/")) {
+      fileObj = getHomeDirectory();
+    } else {
+      Path path = fileSystem.getPath(rootDir, currDir.substring(1));
+      fileObj = new JimfsFtpFile(currDir, path, user);
     }
-
-    /**
-     * Get the file object.
-     */
-    public FtpFile getFile(String file) {
-        String physicalName = getPhysicalName(file);
-        Path filePath = fileSystem.getPath(physicalName);
-
-        // strip the root directory and return
-        String userFileName = physicalName.substring(rootDir.length() - 1);
-        return new JimfsFtpFile(userFileName, filePath, user);
+    return fileObj;
+  }
+
+  /**
+   * Get the file object.
+   */
+  public FtpFile getFile(String file) {
+    String physicalName = getPhysicalName(file);
+    Path filePath = fileSystem.getPath(physicalName);
+
+    // strip the root directory and return
+    String userFileName = physicalName.substring(rootDir.length() - 1);
+    return new JimfsFtpFile(userFileName, filePath, user);
+  }
+
+  /**
+   * Change directory.
+   */
+  public boolean changeWorkingDirectory(String dir) throws FtpException {
+
+    // not a directory - return false
+    dir = getPhysicalName(dir);
+    Path dirPath = fileSystem.getPath(dir);
+    if (!Files.isDirectory(dirPath)) {
+      return false;
     }
 
-    /**
-     * Change directory.
-     */
-    public boolean changeWorkingDirectory(String dir) throws FtpException {
-
-        // not a directory - return false
-        dir = getPhysicalName(dir);
-        Path dirPath = fileSystem.getPath(dir);
-        if (!Files.isDirectory(dirPath)) {
-            return false;
-        }
-
-        // strip user root and add last '/' if necessary
-        dir = dir.substring(rootDir.length() - 1);
-        if (dir.charAt(dir.length() - 1) != '/') {
-            dir = dir + '/';
-        }
-
-        currDir = dir;
-        return true;
+    // strip user root and add last '/' if necessary
+    dir = dir.substring(rootDir.length() - 1);
+    if (dir.charAt(dir.length() - 1) != '/') {
+      dir = dir + '/';
     }
 
-    /**
-     * Is the file content random accessible?
-     */
-    public boolean isRandomAccessible() {
-        return true;
+    currDir = dir;
+    return true;
+  }
+
+  /**
+   * Is the file content random accessible?
+   */
+  public boolean isRandomAccessible() {
+    return true;
+  }
+
+  /**
+   * Dispose the file system.
+   */
+  public void dispose() {
+    // Nothing to do
+  }
+
+  private String normalizeSeparateChar(final String pathName) {
+    String normalizePathName = pathName.replace(fileSystem.getSeparator(), 
"/");
+    return normalizePathName.replace('\\', '/');
+  }
+
+  private String getPhysicalName(final String file) {
+
+    // get the starting directory
+    String normalizedRootDir = normalizeSeparateChar(rootDir);
+    if (normalizedRootDir.charAt(normalizedRootDir.length() - 1) != '/') {
+      normalizedRootDir += '/';
     }
 
-    /**
-     * Dispose the file system.
-     */
-    public void dispose() {
-        // Nothing to do
+    String normalizedFileName = normalizeSeparateChar(file);
+    String resArg;
+    String normalizedCurrDir = currDir;
+    if (normalizedFileName.charAt(0) != '/') {
+      if (normalizedCurrDir == null || normalizedCurrDir.length() == 0) {
+        normalizedCurrDir = "/";
+      }
+
+      normalizedCurrDir = normalizeSeparateChar(normalizedCurrDir);
+
+      if (normalizedCurrDir.charAt(0) != '/') {
+        normalizedCurrDir = '/' + normalizedCurrDir;
+      }
+      if (normalizedCurrDir.charAt(normalizedCurrDir.length() - 1) != '/') {
+        normalizedCurrDir += '/';
+      }
+
+      resArg = normalizedRootDir + normalizedCurrDir.substring(1);
+    } else {
+      resArg = normalizedRootDir;
     }
 
-    private String normalizeSeparateChar(final String pathName) {
-        String normalizePathName = pathName.replace(fileSystem.getSeparator(), 
"/");
-        return normalizePathName.replace('\\', '/');
+    // strip last '/'
+    if (resArg.charAt(resArg.length() - 1) == '/') {
+      resArg = resArg.substring(0, resArg.length() - 1);
     }
 
-    private String getPhysicalName(final String file) {
-
-        // get the starting directory
-        String normalizedRootDir = normalizeSeparateChar(rootDir);
-        if (normalizedRootDir.charAt(normalizedRootDir.length() - 1) != '/') {
-            normalizedRootDir += '/';
+    // replace ., ~ and ..
+    // in this loop resArg will never end with '/'
+    StringTokenizer st = new StringTokenizer(normalizedFileName, "/");
+    while (st.hasMoreTokens()) {
+      String tok = st.nextToken();
+
+      // . => current directory
+      if (tok.equals(".")) {
+        continue;
+      }
+
+      // .. => parent directory (if not root)
+      if (tok.equals("src/main")) {
+        if (resArg.startsWith(normalizedRootDir)) {
+          int slashIndex = resArg.lastIndexOf("/");
+          if (slashIndex != -1) {
+            resArg = resArg.substring(0, slashIndex);
+          }
         }
-
-        String normalizedFileName = normalizeSeparateChar(file);
-        String resArg;
-        String normalizedCurrDir = currDir;
-        if (normalizedFileName.charAt(0) != '/') {
-            if (normalizedCurrDir == null || normalizedCurrDir.length() == 0) {
-                normalizedCurrDir = "/";
-            }
-
-            normalizedCurrDir = normalizeSeparateChar(normalizedCurrDir);
-
-            if (normalizedCurrDir.charAt(0) != '/') {
-                normalizedCurrDir = '/' + normalizedCurrDir;
-            }
-            if (normalizedCurrDir.charAt(normalizedCurrDir.length() - 1) != 
'/') {
-                normalizedCurrDir += '/';
-            }
-
-            resArg = normalizedRootDir + normalizedCurrDir.substring(1);
-        } else {
-            resArg = normalizedRootDir;
+        continue;
+      }
+
+      // ~ => home directory (in this case is the root directory)
+      if (tok.equals("~")) {
+        resArg = normalizedRootDir.substring(0, normalizedRootDir.length() - 
1);
+        continue;
+      }
+
+      if (caseInsensitive) {
+        Path dir = fileSystem.getPath(resArg);
+        DirectoryStream<Path> dirStream = null;
+        try {
+          dirStream = Files.newDirectoryStream(dir, new 
NameEqualsPathFilter(tok, true));
+        } catch (IOException t) {
+          // ignore
         }
-
-        // strip last '/'
-        if (resArg.charAt(resArg.length() - 1) == '/') {
-            resArg = resArg.substring(0, resArg.length() - 1);
+        List<Path> matches = new ArrayList<>(0);
+        if (dirStream != null) {
+          for (Path match : dirStream) {
+            matches.add(match);
+          }
         }
-
-        // replace ., ~ and ..
-        // in this loop resArg will never end with '/'
-        StringTokenizer st = new StringTokenizer(normalizedFileName, "/");
-        while (st.hasMoreTokens()) {
-            String tok = st.nextToken();
-
-            // . => current directory
-            if (tok.equals(".")) {
-                continue;
-            }
-
-            // .. => parent directory (if not root)
-            if (tok.equals("src/main")) {
-                if (resArg.startsWith(normalizedRootDir)) {
-                    int slashIndex = resArg.lastIndexOf("/");
-                    if (slashIndex != -1) {
-                        resArg = resArg.substring(0, slashIndex);
-                    }
-                }
-                continue;
-            }
-
-            // ~ => home directory (in this case is the root directory)
-            if (tok.equals("~")) {
-                resArg = normalizedRootDir.substring(0, 
normalizedRootDir.length() - 1);
-                continue;
-            }
-
-            if (caseInsensitive) {
-                Path dir = fileSystem.getPath(resArg);
-                DirectoryStream<Path> dirStream = null;
-                try {
-                    dirStream = Files.newDirectoryStream(dir, new 
NameEqualsPathFilter(tok, true));
-                } catch (IOException t) {
-                    // ignore
-                }
-                List<Path> matches = new ArrayList<>(0);
-                if (dirStream != null) {
-                    for (Path match : dirStream) {
-                        matches.add(match);
-                    }
-                }
-                if (matches.size() > 0) {
-                    tok = matches.get(0).getFileName().toString();
-                }
-            }
-
-            resArg = resArg + '/' + tok;
+        if (matches.size() > 0) {
+          tok = matches.get(0).getFileName().toString();
         }
+      }
 
-        // add last slash if necessary
-        if ((resArg.length()) + 1 == normalizedRootDir.length()) {
-            resArg += '/';
-        }
+      resArg = resArg + '/' + tok;
+    }
 
-        // final check
-        if (!resArg.regionMatches(0, normalizedRootDir, 0, normalizedRootDir
-                .length())) {
-            resArg = normalizedRootDir;
-        }
+    // add last slash if necessary
+    if ((resArg.length()) + 1 == normalizedRootDir.length()) {
+      resArg += '/';
+    }
 
-        return resArg;
+    // final check
+    if (!resArg.regionMatches(0, normalizedRootDir, 0, normalizedRootDir
+        .length())) {
+      resArg = normalizedRootDir;
     }
+
+    return resArg;
+  }
 }
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/NameEqualsPathFilter.java
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/NameEqualsPathFilter.java
index 063a26b..e9d7a69 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/NameEqualsPathFilter.java
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/playground/filesystem/impl/NameEqualsPathFilter.java
@@ -14,8 +14,7 @@ public class NameEqualsPathFilter implements 
DirectoryStream.Filter<Path> {
 
   private boolean caseInsensitive = false;
 
-  public NameEqualsPathFilter(final String nameToMatch,
-                              final boolean caseInsensitive) {
+  public NameEqualsPathFilter(final String nameToMatch, final boolean 
caseInsensitive) {
     this.nameToMatch = nameToMatch;
     this.caseInsensitive = caseInsensitive;
   }
@@ -23,7 +22,7 @@ public class NameEqualsPathFilter implements 
DirectoryStream.Filter<Path> {
   @Override
   public boolean accept(Path entry) throws IOException {
     if (caseInsensitive) {
-     return entry.getFileName().toString().equalsIgnoreCase(nameToMatch);
+      return entry.getFileName().toString().equalsIgnoreCase(nameToMatch);
     } else {
       return entry.getFileName().toString().equals(nameToMatch);
     }
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/samples/javadsl/Main.java
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/samples/javadsl/Main.java
index 86e11f9..9957c21 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/samples/javadsl/Main.java
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/java/samples/javadsl/Main.java
@@ -1,130 +1,128 @@
 package samples.javadsl;
 
 // #imports
-import java.net.InetAddress;
+
+import org.apache.mina.util.AvailablePortFinder;
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.japi.function.Creator;
+import org.apache.pekko.japi.function.Function;
+import org.apache.pekko.stream.IOResult;
+import org.apache.pekko.stream.connectors.file.javadsl.Directory;
+import org.apache.pekko.stream.connectors.file.javadsl.LogRotatorSink;
+import org.apache.pekko.stream.connectors.ftp.FtpCredentials;
+import org.apache.pekko.stream.connectors.ftp.KeyFileSftpIdentity;
+import org.apache.pekko.stream.connectors.ftp.SftpIdentity;
+import org.apache.pekko.stream.connectors.ftp.SftpSettings;
+import org.apache.pekko.stream.connectors.ftp.javadsl.Sftp;
+import org.apache.pekko.stream.javadsl.Compression;
+import org.apache.pekko.stream.javadsl.Flow;
+import org.apache.pekko.stream.javadsl.Keep;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.util.ByteString;
+import playground.SftpServerEmbedded;
+import playground.filesystem.FileSystemMock;
+
 import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.file.FileSystem;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.FileSystem;
-
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.CompletionStage;
+// #imports
 
-import akka.Done;
-import akka.actor.typed.ActorSystem;
-import akka.actor.typed.javadsl.Behaviors;
-import akka.japi.Pair;
-import akka.japi.function.Creator;
-import akka.japi.function.Function;
-import akka.stream.IOResult;
-import akka.stream.alpakka.file.javadsl.Directory;
-import akka.stream.alpakka.file.javadsl.LogRotatorSink;
-import akka.stream.alpakka.ftp.javadsl.Sftp;
-
-import akka.stream.alpakka.ftp.FtpCredentials;
-import akka.stream.alpakka.ftp.SftpIdentity;
-import akka.stream.alpakka.ftp.KeyFileSftpIdentity;
-import akka.stream.alpakka.ftp.SftpSettings;
-import akka.stream.javadsl.Compression;
-import akka.stream.javadsl.Flow;
-import akka.stream.javadsl.Keep;
-import akka.stream.javadsl.Source;
-import akka.util.ByteString;
+public class Main {
 
-import org.apache.mina.util.AvailablePortFinder;
-import playground.filesystem.FileSystemMock;
-import playground.SftpServerEmbedded;
+  private void run() throws IOException {
+    final ActorSystem<Void> actorSystem = 
ActorSystem.create(Behaviors.empty(), "RotateLogsToFtp");
 
-import akka.stream.javadsl.Sink;
-// #imports
+    final FileSystem ftpFileSystem = new FileSystemMock().fileSystem;
 
-public class Main {
+    final String privateKeyPassphrase = new 
String(SftpServerEmbedded.clientPrivateKeyPassphrase());
+    final String pathToIdentityFile = 
SftpServerEmbedded.clientPrivateKeyFile();
+    final String username = "username";
+    final String password = username;
+    final String hostname = "localhost";
 
-    private void run() throws IOException {
-        final ActorSystem<Void> actorSystem = 
ActorSystem.create(Behaviors.empty(), "RotateLogsToFtp");
-
-        final FileSystem ftpFileSystem = new FileSystemMock().fileSystem;
-
-        final String privateKeyPassphrase = new 
String(SftpServerEmbedded.clientPrivateKeyPassphrase());
-        final String pathToIdentityFile = 
SftpServerEmbedded.clientPrivateKeyFile();
-        final String username = "username";
-        final String password = username;
-        final String hostname = "localhost";
-
-        int port = AvailablePortFinder.getNextAvailable(21_000);
-
-        Path home = 
ftpFileSystem.getPath(SftpServerEmbedded.FtpRootDir()).resolve("tmp");
-        if (!Files.exists(home)) Files.createDirectories(home);
-
-        SftpServerEmbedded.start(ftpFileSystem, port);
-
-        // #sample
-        Iterator<ByteString> data =
-                Arrays.asList('a', 'b', 'c', 'd').stream()
-                        .map(
-                                e -> {
-                                    char[] arr = new char[100];
-                                    Arrays.fill(arr, e);
-                                    return 
ByteString.fromString(String.valueOf(arr));
-                                })
-                        .iterator();
-
-        // (2)
-        Creator<Function<ByteString, Optional<String>>> rotator =
-                () -> {
-                    final char[] last = {' '};
-                    return (bs) -> {
-                        char c = (char) bs.head();
-                        if (c != last[0]) {
-                            last[0] = c;
-                            return Optional.of("log-" + c + ".z");
-                        } else {
-                            return Optional.empty();
-                        }
-                    };
-                };
-
-        // (3)
-        KeyFileSftpIdentity identity =
-                SftpIdentity.createFileSftpIdentity(pathToIdentityFile, 
privateKeyPassphrase.getBytes());
-        SftpSettings settings =
-                SftpSettings.create(InetAddress.getByName(hostname))
-                        .withPort(port)
-                        .withSftpIdentity(identity)
-                        .withStrictHostKeyChecking(false)
-                        .withCredentials(FtpCredentials.create(username, 
password));
-
-        Function<String, Sink<ByteString, CompletionStage<IOResult>>> sink =
-                path -> {
-                    Sink<ByteString, CompletionStage<IOResult>> ftpSink = 
Sftp.toPath("tmp/" + path, settings);
-                    return Flow.<ByteString>create()
-                            .via(Compression.gzip()) // (4)
-                            .toMat(ftpSink, Keep.right());
-                };
-
-        CompletionStage<Done> completion =
-                Source.fromIterator(() -> data)
-                        .runWith(LogRotatorSink.withSinkFactory(rotator, 
sink), actorSystem);
-        // #sample
-
-        completion
-                .thenApply(
-                        (i) ->
-                                Directory.ls(home)
-                                        .runForeach((f) -> 
System.out.println(f.toString()), actorSystem))
-                .whenComplete(
-                        (res, ex) -> {
-                            if (ex != null) {
-                                ex.printStackTrace();
-                            }
-                            actorSystem.terminate();
-                            actorSystem.getWhenTerminated().thenAccept(t -> 
SftpServerEmbedded.stopServer());
-                        });
-    }
+    int port = AvailablePortFinder.getNextAvailable(21_000);
 
-    public static void main(String[] args) throws IOException {
-        new Main().run();
+    Path home = 
ftpFileSystem.getPath(SftpServerEmbedded.FtpRootDir()).resolve("tmp");
+    if (!Files.exists(home)) {
+      Files.createDirectories(home);
     }
+
+    SftpServerEmbedded.start(ftpFileSystem, port);
+
+    // #sample
+    Iterator<ByteString> data =
+        Arrays.asList('a', 'b', 'c', 'd').stream()
+            .map(
+                e -> {
+                  char[] arr = new char[100];
+                  Arrays.fill(arr, e);
+                  return ByteString.fromString(String.valueOf(arr));
+                })
+            .iterator();
+
+    // (2)
+    Creator<Function<ByteString, Optional<String>>> rotator =
+        () -> {
+          final char[] last = {' '};
+          return (bs) -> {
+            char c = (char) bs.head();
+            if (c != last[0]) {
+              last[0] = c;
+              return Optional.of("log-" + c + ".z");
+            } else {
+              return Optional.empty();
+            }
+          };
+        };
+
+    // (3)
+    KeyFileSftpIdentity identity =
+        SftpIdentity.createFileSftpIdentity(pathToIdentityFile, 
privateKeyPassphrase.getBytes());
+    SftpSettings settings =
+        SftpSettings.create(InetAddress.getByName(hostname))
+            .withPort(port)
+            .withSftpIdentity(identity)
+            .withStrictHostKeyChecking(false)
+            .withCredentials(FtpCredentials.create(username, password));
+
+    Function<String, Sink<ByteString, CompletionStage<IOResult>>> sink =
+        path -> {
+          Sink<ByteString, CompletionStage<IOResult>> ftpSink = 
Sftp.toPath("tmp/" + path, settings);
+          return Flow.<ByteString>create()
+              .via(Compression.gzip()) // (4)
+              .toMat(ftpSink, Keep.right());
+        };
+
+    CompletionStage<Done> completion =
+        Source.fromIterator(() -> data)
+            .runWith(LogRotatorSink.withSinkFactory(rotator, sink), 
actorSystem);
+    // #sample
+
+    completion
+        .thenApply(
+            (i) ->
+                Directory.ls(home)
+                    .runForeach((f) -> System.out.println(f.toString()), 
actorSystem))
+        .whenComplete(
+            (res, ex) -> {
+              if (ex != null) {
+                ex.printStackTrace();
+              }
+              actorSystem.terminate();
+              actorSystem.getWhenTerminated().thenAccept(t -> 
SftpServerEmbedded.stopServer());
+            });
+  }
+
+  public static void main(String[] args) throws IOException {
+    new Main().run();
+  }
 }
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/application.conf
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/application.conf
index 7bdde8d..77117b1 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/application.conf
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/application.conf
@@ -1,5 +1,5 @@
-akka {
-  loggers = ["akka.event.slf4j.Slf4jLogger"]
-  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+pekko {
+  loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+  logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
   loglevel = "DEBUG"
 }
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/logback.xml 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/logback.xml
index 1830a63..566fda5 100644
--- a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/logback.xml
+++ b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/resources/logback.xml
@@ -7,9 +7,9 @@
         </encoder>
     </appender>
 
-    <logger name="akka" level="INFO"/>
-    <logger name="akka.stream.alpakka.file" level="INFO"/>
-    <logger name="akka.stream.alpakka.ftp" level="INFO"/>
+    <logger name="org.apache.pekko" level="INFO"/>
+    <logger name="org.apache.pekko.stream.connectors.file" level="INFO"/>
+    <logger name="org.apache.pekko.stream.connectors.ftp" level="INFO"/>
     <logger name="org.apache.sshd" level="INFO"/>
     <logger name="net.schmizz" level="INFO"/>
 
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/playground/SftpServerEmbedded.scala
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/playground/SftpServerEmbedded.scala
index e0827e9..ce2dba9 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/playground/SftpServerEmbedded.scala
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/playground/SftpServerEmbedded.scala
@@ -6,7 +6,7 @@ package playground
 
 import java.io.File
 import java.nio.charset.Charset
-import java.nio.file.{FileSystem, Files, Path, Paths}
+import java.nio.file.{ FileSystem, Files, Path, Paths }
 import java.security.PublicKey
 import java.util
 
diff --git 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/samples/scaladsl/Main.scala
 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/samples/scaladsl/Main.scala
index b4319bc..4e3fd85 100644
--- 
a/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/samples/scaladsl/Main.scala
+++ 
b/pekko-connectors-sample-rotate-logs-to-ftp/src/main/scala/samples/scaladsl/Main.scala
@@ -2,22 +2,21 @@ package samples.scaladsl
 
 // #imports
 
-import java.net.InetAddress
-import java.nio.file.{Files, Path}
-
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.scaladsl.Behaviors
-import akka.stream.alpakka.file.scaladsl.{Directory, LogRotatorSink}
-import akka.stream.alpakka.ftp.scaladsl.Sftp
-import akka.stream.alpakka.ftp.{FtpCredentials, SftpIdentity, SftpSettings}
-import akka.stream.scaladsl.{Compression, Flow, Keep, Source}
-import akka.util.ByteString
 import org.apache.mina.util.AvailablePortFinder
+import org.apache.pekko.actor.typed.ActorSystem
+import org.apache.pekko.actor.typed.scaladsl.Behaviors
+import org.apache.pekko.stream.connectors.file.scaladsl.{ Directory, 
LogRotatorSink }
+import org.apache.pekko.stream.connectors.ftp.{ FtpCredentials, SftpIdentity, 
SftpSettings }
+import org.apache.pekko.stream.connectors.ftp.scaladsl.Sftp
+import org.apache.pekko.stream.scaladsl.{ Compression, Flow, Keep, Source }
+import org.apache.pekko.util.ByteString
 import playground.SftpServerEmbedded
 import playground.filesystem.FileSystemMock
 
+import java.net.InetAddress
+import java.nio.file.{ Files, Path }
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.ExecutionContext
 // #imports
 
 object Main extends App {
@@ -26,7 +25,6 @@ object Main extends App {
 
   def wait(duration: FiniteDuration): Unit = Thread.sleep(duration.toMillis)
 
-
   private val ftpFileSystem = new FileSystemMock().fileSystem
   private val privateKeyPassphrase = 
SftpServerEmbedded.clientPrivateKeyPassphrase
   private val pathToIdentityFile = SftpServerEmbedded.clientPrivateKeyFile


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to