GWphua commented on code in PR #18982:
URL: https://github.com/apache/druid/pull/18982#discussion_r2792247364


##########
processing/src/main/java/org/apache/druid/utils/CompressionUtils.java:
##########
@@ -212,6 +286,106 @@ public static long zip(File directory, OutputStream out) 
throws IOException
     return totalSize;
   }
 
+  /**
+   * Compresses directory contents using LZ4 block compression with a simple 
archive format.
+   * Format: [file_count:4 
bytes][file1_name_length:4][file1_name:bytes][file1_size:8][file1_data:bytes]...
+   *
+   * @param directory The directory whose contents should be compressed
+   * @param out The output stream to write compressed data to
+   * @return The number of bytes (uncompressed) read from the input directory
+   */
+  public static long lz4CompressDirectory(File directory, OutputStream out) 
throws IOException
+  {
+    if (!directory.isDirectory()) {
+      throw new IOE("directory[%s] is not a directory", directory);
+    }
+
+    // Use fast compressor for better performance (lower CPU, faster 
compression)
+    final LZ4BlockOutputStream lz4Out = new LZ4BlockOutputStream(
+        out,
+        64 * 1024, // Block size
+        LZ4Factory.fastestInstance().fastCompressor()
+    );
+    // Use DataOutputStream for structured writing
+    final DataOutputStream dataOut = new DataOutputStream(lz4Out);
+    final File[] files = directory.listFiles();
+
+    if (files == null) {
+      throw new IOE("Cannot list files in directory[%s]", directory);
+    }
+
+    // Sort for consistency
+    final File[] sortedFiles = 
Arrays.stream(files).sorted().toArray(File[]::new);
+
+    dataOut.writeInt(sortedFiles.length);
+
+    long totalSize = 0;
+
+    for (File file : sortedFiles) {
+      if (file.isDirectory()) {
+        continue; // Skip subdirectories like ZIP does
+      }
+
+      log.debug("Compressing file[%s] with size[%,d]. Total size so far[%,d]", 
file, file.length(), totalSize);
+
+      final String fileName = file.getName();
+      final byte[] fileNameBytes = fileName.getBytes(StandardCharsets.UTF_8);
+      dataOut.writeInt(fileNameBytes.length);
+      dataOut.write(fileNameBytes);
+
+      final long fileSize = file.length();
+      if (fileSize > Integer.MAX_VALUE) {
+        throw new IOE("file[%s] too large [%,d]", file, fileSize);
+      }
+
+      dataOut.writeLong(fileSize);
+      totalSize += fileSize;
+
+      // Copy file content to dataOut
+      try (FileInputStream fileIn = new FileInputStream(file)) {
+        ByteStreams.copy(fileIn, dataOut);
+      }
+    }
+
+    dataOut.flush();
+    lz4Out.finish();
+    return totalSize;
+  }
+
+  /**
+   * Decompresses LZ4-compressed directory archive
+   */
+  public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream 
in, File outDir) throws IOException
+  {
+    if (!(outDir.exists() && outDir.isDirectory())) {
+      throw new ISE("outDir[%s] must exist and be a directory", outDir);
+    }
+
+    final LZ4BlockInputStream lz4In = new LZ4BlockInputStream(in);

Review Comment:
   Deprecated API, maybe try 
   
   LZ4BlockInputStream.newBuilder().build(in);



##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java:
##########
@@ -235,27 +248,16 @@ FileUtils.FileCopyResult getSegmentFiles(final Path path, 
final File outDir) thr
         catch (Exception e) {
           throw new RuntimeException(e);
         }
-      } else if (CompressionUtils.isZip(path.getName())) {
+      }
 
-        // --------    zip     ---------

Review Comment:
   Out of scope of this PR, but seems like it may be better to use a factory 
class instead to handle this logic... 
   
   Maybe something to do in the future.



##########
processing/src/main/java/org/apache/druid/utils/CompressionUtils.java:
##########
@@ -622,6 +796,8 @@ public static InputStream decompress(final InputStream in, 
final String fileName
   {
     if (fileName.endsWith(Format.GZ.getSuffix())) {
       return gzipInputStream(in);
+    } else if (fileName.endsWith(Format.LZ4.getSuffix())) {
+      return new LZ4BlockInputStream(in);

Review Comment:
   Same as above



##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java:
##########
@@ -235,27 +248,16 @@ FileUtils.FileCopyResult getSegmentFiles(final Path path, 
final File outDir) thr
         catch (Exception e) {
           throw new RuntimeException(e);
         }
-      } else if (CompressionUtils.isZip(path.getName())) {
+      }
 
-        // --------    zip     ---------
+      // Try to detect format from file extension and decompress
+      final CompressionUtils.Format format = 
CompressionUtils.Format.fromFileName(path.getName());
+      if ((format == CompressionUtils.Format.ZIP || format == 
CompressionUtils.Format.LZ4)) {
+        long startTime = System.currentTimeMillis();
 
-        final FileUtils.FileCopyResult result = CompressionUtils.unzip(
-            new ByteSource()
-            {
-              @Override
-              public InputStream openStream() throws IOException
-              {
-                return getInputStream(path);
-              }
-            }, outDir, shouldRetryPredicate(), false
-        );
+        final FileUtils.FileCopyResult result = 
format.decompressDirectory(getInputStream(path), outDir);
 
-        log.info(
-            "Unzipped %d bytes from [%s] to [%s]",
-            result.size(),
-            path.toString(),
-            outDir.getAbsolutePath()
-        );
+        emitMetrics(format, result.size(), System.currentTimeMillis() - 
startTime);

Review Comment:
   Any chance that `emitMetrics` can also apply for Gz / Directory? Since we 
are emitting for `hdfs/pull/duration` + `hdfs/pull/size`. 
   
   There is an expectation for metrics to be emitted for non-zip/lz4 too.



##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java:
##########
@@ -292,6 +294,17 @@ public InputStream openStream() throws IOException
     }
   }
 
+  private void emitMetrics(CompressionUtils.Format format, long size, long 
duration)
+  {
+    if (emitter == null) {
+      return;
+    }
+    ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
+    metricBuilder.setDimension("format", format);
+    emitter.emit(metricBuilder.setMetric("hdfs/pull/size", size));
+    emitter.emit(metricBuilder.setMetric("hdfs/pull/duration", duration));

Review Comment:
   Can add docs for these metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to