jasperjiaguo commented on code in PR #8753:
URL: https://github.com/apache/pinot/pull/8753#discussion_r894003884


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java:
##########
@@ -1004,6 +1004,26 @@ public int downloadFile(URI uri, File dest, AuthProvider 
authProvider, List<Head
     return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, 
dest, authProvider, httpHeaders);
   }
 
+  /**
+   * Download and untar a file in a streamed way with rate limit
+   *
+   * @param uri URI
+   * @param dest File destination
+   * @param authProvider auth token
+   * @param httpHeaders http headers
+   * @param rateLimit limit the rate to write download-untar stream to disk, 
in bytes
+   *                  -1 for no disk write limit, 0 for limit the writing to 
min(untar, download) rate
+   * @return Response status code
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider 
authProvider, List<Header> httpHeaders,
+      long rateLimit)

Review Comment:
   done



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +238,30 @@ public static void untarOneFile(File inputFile, String 
fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", 
fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, 
FileOutputStream outputStream, long rateLimit)
+      throws IOException {
+    Objects.requireNonNull(inputStream, "inputStream is null");

Review Comment:
   done



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +238,30 @@ public static void untarOneFile(File inputFile, String 
fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", 
fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, 
FileOutputStream outputStream, long rateLimit)
+      throws IOException {
+    Objects.requireNonNull(inputStream, "inputStream is null");
+    Objects.requireNonNull(outputStream, "outputStream is null");
+    FileDescriptor fd = outputStream.getFD();
+    LOGGER.info("Using rate limiter for stream copy, target limit {} bytes/s", 
rateLimit);
+    byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+    RateLimiter rateLimiter = RateLimiter.create(rateLimit);
+    long count;
+    int n;
+
+    if (rateLimit == SYNC_DISK_WRITE_WITH_UPSTREAM_RATE) {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) 
n) {
+        outputStream.write(buffer, 0, n);
+        fd.sync(); // flush the buffer timely to the disk so that the disk 
bandwidth wouldn't get saturated
+      }
+    } else {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) 
n) {

Review Comment:
   done



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +238,30 @@ public static void untarOneFile(File inputFile, String 
fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", 
fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, 
FileOutputStream outputStream, long rateLimit)
+      throws IOException {
+    Objects.requireNonNull(inputStream, "inputStream is null");
+    Objects.requireNonNull(outputStream, "outputStream is null");
+    FileDescriptor fd = outputStream.getFD();
+    LOGGER.info("Using rate limiter for stream copy, target limit {} bytes/s", 
rateLimit);
+    byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+    RateLimiter rateLimiter = RateLimiter.create(rateLimit);
+    long count;
+    int n;
+
+    if (rateLimit == SYNC_DISK_WRITE_WITH_UPSTREAM_RATE) {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) 
n) {
+        outputStream.write(buffer, 0, n);
+        fd.sync(); // flush the buffer timely to the disk so that the disk 
bandwidth wouldn't get saturated
+      }
+    } else {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) 
n) {
+        rateLimiter.acquire(n);
+        outputStream.write(buffer, 0, n);
+        fd.sync();
+      }
+    }
+    return count;

Review Comment:
   This function is actually called per file. So I removed all the logging here 
as it will be too much. We already have similar logging in `private File 
downloadAndStreamUntarRateLimit(String segmentName, SegmentZKMetadata 
zkMetadata, File tempRootDir, long maxDownloadRateInByte)` which should be 
enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to