This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d6307a1b7 Add a warning message when semaphore cannot be acquired in 
a while (#14386)
5d6307a1b7 is described below

commit 5d6307a1b79b0738ed8aa15ad312898c15f58658
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Nov 5 12:48:33 2024 +0100

    Add a warning message when semaphore cannot be acquired in a while (#14386)
---
 .../realtime/RealtimeSegmentDataManager.java       | 28 +++++++++++++++-------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 2c8f086f62..2fbee173af 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -24,6 +24,8 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1004,12 +1006,26 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       _segmentLogger.warn("Table data manager is already shut down");
       return null;
     }
+    final long startTimeMillis = now();
     try {
-      final long startTimeMillis = now();
       if (_segBuildSemaphore != null) {
-        _segmentLogger.info("Waiting to acquire semaphore for building 
segment");
-        _segBuildSemaphore.acquire();
+        _segmentLogger.info("Trying to acquire semaphore for building 
segment");
+        Instant acquireStart = Instant.now();
+        int timeoutSeconds = 5;
+        while (!_segBuildSemaphore.tryAcquire(timeoutSeconds, 
TimeUnit.SECONDS)) {
+          _segmentLogger.warn("Could not acquire semaphore for building 
segment in {}",
+              Duration.between(acquireStart, Instant.now()));
+          timeoutSeconds = Math.min(timeoutSeconds * 2, 300);
+        }
+        _segmentLogger.info("Acquired semaphore for building segment");
       }
+    } catch (InterruptedException e) {
+      String errorMessage = "Interrupted while waiting for semaphore";
+      _segmentLogger.error(errorMessage, e);
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
+      return null;
+    }
+    try {
       // Increment llc simultaneous segment builds.
       
_serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS,
 1L);
 
@@ -1105,13 +1121,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         return new SegmentBuildDescriptor(null, null, _currentOffset, 
buildTimeMillis, waitTimeMillis,
             segmentSizeBytes);
       }
-    } catch (InterruptedException e) {
-      String errorMessage = "Interrupted while waiting for semaphore";
-      _segmentLogger.error(errorMessage, e);
-      _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
-      return null;
     } finally {
       if (_segBuildSemaphore != null) {
+        _segmentLogger.info("Releasing semaphore for building segment");
         _segBuildSemaphore.release();
       }
       // Decrement llc simultaneous segment builds.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to