Jackie-Jiang commented on code in PR #15001:
URL: https://github.com/apache/pinot/pull/15001#discussion_r1955194993
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -815,38 +814,49 @@ protected File
downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
segmentName, System.currentTimeMillis() - startTime,
_segmentDownloadSemaphore.getQueueLength());
}
try {
- File untarredSegmentDir;
- if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() ==
null) {
- _logger.info("Downloading segment: {} using streamed download-untar
with maxStreamRateInByte: {}", segmentName,
- _streamSegmentDownloadUntarRateLimitBytesPerSec);
- AtomicInteger failedAttempts = new AtomicInteger(0);
- try {
- untarredSegmentDir =
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
- _streamSegmentDownloadUntarRateLimitBytesPerSec, failedAttempts);
- _logger.info("Downloaded and untarred segment: {} from: {}, failed
attempts: {}", segmentName, downloadUrl,
- failedAttempts.get());
- } finally {
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
- failedAttempts.get());
+ if (_segmentOperationsThrottler != null) {
+ _segmentOperationsThrottler.getSegmentDownloadThrottler().acquire();
+ }
+ try {
+ File untarredSegmentDir;
+ if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() ==
null) {
+ _logger.info("Downloading segment: {} using streamed download-untar
with maxStreamRateInByte: {}",
+ segmentName,
+ _streamSegmentDownloadUntarRateLimitBytesPerSec);
Review Comment:
(nit) reformat
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -815,38 +814,49 @@ protected File
downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
segmentName, System.currentTimeMillis() - startTime,
_segmentDownloadSemaphore.getQueueLength());
}
try {
- File untarredSegmentDir;
- if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() ==
null) {
- _logger.info("Downloading segment: {} using streamed download-untar
with maxStreamRateInByte: {}", segmentName,
- _streamSegmentDownloadUntarRateLimitBytesPerSec);
- AtomicInteger failedAttempts = new AtomicInteger(0);
- try {
- untarredSegmentDir =
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
- _streamSegmentDownloadUntarRateLimitBytesPerSec, failedAttempts);
- _logger.info("Downloaded and untarred segment: {} from: {}, failed
attempts: {}", segmentName, downloadUrl,
- failedAttempts.get());
- } finally {
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
- failedAttempts.get());
+ if (_segmentOperationsThrottler != null) {
+ _segmentOperationsThrottler.getSegmentDownloadThrottler().acquire();
Review Comment:
You might want to add some log similar to the table level semaphore for
debugging purpose
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -858,6 +868,9 @@ protected File downloadSegmentFromPeers(SegmentZKMetadata
zkMetadata)
_logger.info("Downloading segment: {} from peers", segmentName);
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
File segmentTarFile = new File(tempRootDir, segmentName +
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
+ if (_segmentOperationsThrottler != null) {
+ _segmentOperationsThrottler.getSegmentDownloadThrottler().acquire();
Review Comment:
Same here, add some info for debugging purpose
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -815,38 +814,49 @@ protected File
downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
segmentName, System.currentTimeMillis() - startTime,
_segmentDownloadSemaphore.getQueueLength());
}
try {
- File untarredSegmentDir;
- if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() ==
null) {
- _logger.info("Downloading segment: {} using streamed download-untar
with maxStreamRateInByte: {}", segmentName,
- _streamSegmentDownloadUntarRateLimitBytesPerSec);
- AtomicInteger failedAttempts = new AtomicInteger(0);
- try {
- untarredSegmentDir =
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
- _streamSegmentDownloadUntarRateLimitBytesPerSec, failedAttempts);
- _logger.info("Downloaded and untarred segment: {} from: {}, failed
attempts: {}", segmentName, downloadUrl,
- failedAttempts.get());
- } finally {
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
- failedAttempts.get());
+ if (_segmentOperationsThrottler != null) {
+ _segmentOperationsThrottler.getSegmentDownloadThrottler().acquire();
+ }
+ try {
+ File untarredSegmentDir;
+ if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() ==
null) {
+ _logger.info("Downloading segment: {} using streamed download-untar
with maxStreamRateInByte: {}",
+ segmentName,
+ _streamSegmentDownloadUntarRateLimitBytesPerSec);
+ AtomicInteger failedAttempts = new AtomicInteger(0);
+ try {
+ untarredSegmentDir =
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
+ _streamSegmentDownloadUntarRateLimitBytesPerSec,
failedAttempts);
+ _logger.info("Downloaded and untarred segment: {} from: {}, failed
attempts: {}", segmentName, downloadUrl,
+ failedAttempts.get());
+ } finally {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
+ ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
+ failedAttempts.get());
Review Comment:
(nit) reformat
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]