This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5d6307a1b7 Add a warning message when semaphore cannot be acquired in
a while (#14386)
5d6307a1b7 is described below
commit 5d6307a1b79b0738ed8aa15ad312898c15f58658
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Nov 5 12:48:33 2024 +0100
Add a warning message when semaphore cannot be acquired in a while (#14386)
---
.../realtime/RealtimeSegmentDataManager.java | 28 +++++++++++++++-------
1 file changed, 20 insertions(+), 8 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 2c8f086f62..2fbee173af 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -24,6 +24,8 @@ import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -1004,12 +1006,26 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentLogger.warn("Table data manager is already shut down");
return null;
}
+ final long startTimeMillis = now();
try {
- final long startTimeMillis = now();
if (_segBuildSemaphore != null) {
- _segmentLogger.info("Waiting to acquire semaphore for building
segment");
- _segBuildSemaphore.acquire();
+ _segmentLogger.info("Trying to acquire semaphore for building
segment");
+ Instant acquireStart = Instant.now();
+ int timeoutSeconds = 5;
+ while (!_segBuildSemaphore.tryAcquire(timeoutSeconds,
TimeUnit.SECONDS)) {
+ _segmentLogger.warn("Could not acquire semaphore for building
segment in {}",
+ Duration.between(acquireStart, Instant.now()));
+ timeoutSeconds = Math.min(timeoutSeconds * 2, 300);
+ }
+ _segmentLogger.info("Acquired semaphore for building segment");
}
+ } catch (InterruptedException e) {
+ String errorMessage = "Interrupted while waiting for semaphore";
+ _segmentLogger.error(errorMessage, e);
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
+ return null;
+ }
+ try {
// Increment llc simultaneous segment builds.
_serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS,
1L);
@@ -1105,13 +1121,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return new SegmentBuildDescriptor(null, null, _currentOffset,
buildTimeMillis, waitTimeMillis,
segmentSizeBytes);
}
- } catch (InterruptedException e) {
- String errorMessage = "Interrupted while waiting for semaphore";
- _segmentLogger.error(errorMessage, e);
- _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
- return null;
} finally {
if (_segBuildSemaphore != null) {
+ _segmentLogger.info("Releasing semaphore for building segment");
_segBuildSemaphore.release();
}
// Decrement llc simultaneous segment builds.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]