dianfu commented on a change in pull request #15813: URL: https://github.com/apache/flink/pull/15813#discussion_r623649930
########## File path: flink-python/src/main/java/org/apache/flink/python/util/TarGzUtils.java ########## @@ -0,0 +1,69 @@ +package org.apache.flink.python.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.IOUtils; + +import org.apache.commons.compress.archivers.ArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** Utils used to extract tar.gz files and try to restore the origin permissions of files. */ +@Internal +public class TarGzUtils { Review comment: What about moving all these methods to DecompressUtils? ########## File path: flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java ########## @@ -326,9 +327,21 @@ private void constructArchivesDirectory(Map<String, String> env) throws IOExcept // extract archives to archives directory for (Map.Entry<String, String> entry : dependencyInfo.getArchives().entrySet()) { - ZipUtils.extractZipFileWithPermissions( - entry.getKey(), - String.join(File.separator, archivesDirectory, entry.getValue())); + String filePath = entry.getKey(); + if (filePath.endsWith(".zip") || filePath.endsWith(".jar")) { + ZipUtils.extractZipFileWithPermissions( + filePath, + String.join(File.separator, archivesDirectory, entry.getValue())); + } else if (filePath.endsWith(".tar.gz")) { Review comment: What about supporting .tar and .tgz as well? It should not bring two much extra efforts as most of the logic could be shared. ########## File path: flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java ########## @@ -326,9 +327,21 @@ private void constructArchivesDirectory(Map<String, String> env) throws IOExcept // extract archives to archives directory for (Map.Entry<String, String> entry : dependencyInfo.getArchives().entrySet()) { - ZipUtils.extractZipFileWithPermissions( - entry.getKey(), - String.join(File.separator, archivesDirectory, entry.getValue())); + String filePath = entry.getKey(); + if (filePath.endsWith(".zip") || filePath.endsWith(".jar")) { Review comment: It's better to convert filePath to lowercase and then checks the suffix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org