dianfu commented on a change in pull request #15813:
URL: https://github.com/apache/flink/pull/15813#discussion_r623649930



##########
File path: 
flink-python/src/main/java/org/apache/flink/python/util/TarGzUtils.java
##########
@@ -0,0 +1,69 @@
+package org.apache.flink.python.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/** Utils used to extract tar.gz files and try to restore the origin 
permissions of files. */
+@Internal
+public class TarGzUtils {

Review comment:
       What about moving all these methods to DecompressUtils?

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
##########
@@ -326,9 +327,21 @@ private void constructArchivesDirectory(Map<String, 
String> env) throws IOExcept
 
             // extract archives to archives directory
             for (Map.Entry<String, String> entry : 
dependencyInfo.getArchives().entrySet()) {
-                ZipUtils.extractZipFileWithPermissions(
-                        entry.getKey(),
-                        String.join(File.separator, archivesDirectory, 
entry.getValue()));
+                String filePath = entry.getKey();
+                if (filePath.endsWith(".zip") || filePath.endsWith(".jar")) {
+                    ZipUtils.extractZipFileWithPermissions(
+                            filePath,
+                            String.join(File.separator, archivesDirectory, 
entry.getValue()));
+                } else if (filePath.endsWith(".tar.gz")) {

Review comment:
       What about supporting .tar and .tgz as well? It should not bring two 
much extra efforts as most of the logic could be shared.

##########
File path: 
flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
##########
@@ -326,9 +327,21 @@ private void constructArchivesDirectory(Map<String, 
String> env) throws IOExcept
 
             // extract archives to archives directory
             for (Map.Entry<String, String> entry : 
dependencyInfo.getArchives().entrySet()) {
-                ZipUtils.extractZipFileWithPermissions(
-                        entry.getKey(),
-                        String.join(File.separator, archivesDirectory, 
entry.getValue()));
+                String filePath = entry.getKey();
+                if (filePath.endsWith(".zip") || filePath.endsWith(".jar")) {

Review comment:
       It's better to convert filePath to lowercase and then checks the suffix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to