This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 40b2a65287 Fix the double destroy of segment data manager during 
server shutdown (#10475)
40b2a65287 is described below

commit 40b2a65287d5c7d8a3ed715e7f63438b06c13956
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Mar 24 16:07:15 2023 -0700

    Fix the double destroy of segment data manager during server shutdown 
(#10475)
---
 .../offline/ImmutableSegmentDataManager.java       |  2 +-
 .../realtime/HLRealtimeSegmentDataManager.java     |  2 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  2 +-
 .../local/data/manager/SegmentDataManager.java     | 14 +++++++-
 .../RealtimeIndexOffHeapMemoryManager.java         | 37 ++++++++++++----------
 5 files changed, 37 insertions(+), 20 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
index 71c79b941b..1646ba77b9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
@@ -44,7 +44,7 @@ public class ImmutableSegmentDataManager extends 
SegmentDataManager {
   }
 
   @Override
-  public void destroy() {
+  protected void doDestroy() {
     _immutableSegment.destroy();
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 9f80ad8fe6..0afbb6d2ca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -469,7 +469,7 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   }
 
   @Override
-  public void destroy() {
+  protected void doDestroy() {
     LOGGER.info("Trying to shutdown RealtimeSegmentDataManager : {}!", 
_segmentName);
     _isShuttingDown = true;
     try {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index bed3a6a353..cc6b23243e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1238,7 +1238,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     return true;
   }
 
-  public void destroy() {
+  protected void doDestroy() {
     try {
       stop();
     } catch (InterruptedException e) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
index 46933a8356..2e1dae8d1f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.data.manager;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pinot.segment.spi.IndexSegment;
 
 
@@ -27,6 +28,7 @@ import org.apache.pinot.segment.spi.IndexSegment;
  */
 public abstract class SegmentDataManager {
   private final long _loadTimeMs = System.currentTimeMillis();
+  private final AtomicBoolean _destroyed = new AtomicBoolean();
   private int _referenceCount = 1;
 
   public long getLoadTimeMs() {
@@ -71,5 +73,15 @@ public abstract class SegmentDataManager {
 
   public abstract IndexSegment getSegment();
 
-  public abstract void destroy();
+  /**
+   * Destroys the data manager and releases all the resources allocated.
+   * The data manager can only be destroyed once.
+   */
+  public void destroy() {
+    if (_destroyed.compareAndSet(false, true)) {
+      doDestroy();
+    }
+  }
+
+  protected abstract void doDestroy();
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
index 861fa953a1..737460593c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
@@ -20,8 +20,9 @@ package org.apache.pinot.segment.local.io.readerwriter;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.HLCSegmentName;
@@ -32,34 +33,37 @@ import 
org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 
 
 /**
- * @class RealtimeIndexOffHeapMemoryManager is an abstract class that 
implements base functionality to allocate and
- * release
+ * RealtimeIndexOffHeapMemoryManager is an abstract class that implements base 
functionality to allocate and release
  * memory that is acquired during realtime segment consumption.
  *
- * Realtime consuming segments use memory for dictionary, forward index, and 
inverted indices. For off-heap
- * allocation of memory, we instantiate one OffHeapMemoryManager for each 
segment in the server,
+ * Realtime consuming segments use memory for dictionary, forward index, and 
inverted indices. For off-heap allocation
+ * of memory, we instantiate one RealtimeIndexOffHeapMemoryManager for each 
segment.
  *
- * Closing the RealtimeOffHeapMemoryManager also releases all the resources 
allocated by the OffHeapMemoryManager.
+ * Closing the RealtimeOffHeapMemoryManager also releases all the resources 
allocated.
+ *
+ * This class is NOT thread safe. Only one thread should access this class.
  */
+@NotThreadSafe
 public abstract class RealtimeIndexOffHeapMemoryManager implements 
PinotDataBufferMemoryManager {
-  private final List<PinotDataBuffer> _buffers = new LinkedList<>();
+  private final List<PinotDataBuffer> _buffers = new ArrayList<>();
+  private final String _rawTableName;
   private final String _segmentName;
   private final ServerMetrics _serverMetrics;
+
   private long _totalAllocatedBytes = 0;
-  private final String _tableName;
 
   protected RealtimeIndexOffHeapMemoryManager(ServerMetrics serverMetrics, 
String segmentName) {
     _serverMetrics = serverMetrics;
     _segmentName = segmentName;
     LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
     if (llcSegmentName != null) {
-      _tableName = llcSegmentName.getTableName();
+      _rawTableName = llcSegmentName.getTableName();
     } else if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
       HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
-      _tableName = hlcSegmentName.getTableName();
+      _rawTableName = hlcSegmentName.getTableName();
     } else {
       // For testing only
-      _tableName = "NoSuchTable";
+      _rawTableName = "NoSuchTable";
     }
   }
 
@@ -82,7 +86,7 @@ public abstract class RealtimeIndexOffHeapMemoryManager 
implements PinotDataBuff
     _totalAllocatedBytes += size;
     _buffers.add(buffer);
     if (_serverMetrics != null) {
-      _serverMetrics.addValueToTableGauge(_tableName, 
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size);
+      _serverMetrics.addValueToTableGauge(_rawTableName, 
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size);
     }
     return buffer;
   }
@@ -97,9 +101,9 @@ public abstract class RealtimeIndexOffHeapMemoryManager 
implements PinotDataBuff
 
   /**
    * Close out this memory manager and release all memory and resources.
-   * This method must be called when all the memory allocated by this class is 
not longer in use.
-   * The application may choose to call (or not call) PinotDataBuffer.close(), 
but this.close() MUST
-   * be called to release all resources allocated.
+   * This method must be called when all the memory allocated by this class is 
no longer in use.
+   * The application may choose to call (or not call) PinotDataBuffer.close(), 
but this.close() MUST be called to
+   * release all resources allocated.
    *
    * @throws IOException
    */
@@ -109,7 +113,8 @@ public abstract class RealtimeIndexOffHeapMemoryManager 
implements PinotDataBuff
       buffer.close();
     }
     if (_serverMetrics != null) {
-      _serverMetrics.addValueToTableGauge(_tableName, 
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, -_totalAllocatedBytes);
+      _serverMetrics.addValueToTableGauge(_rawTableName, 
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED,
+          -_totalAllocatedBytes);
     }
     doClose();
     _buffers.clear();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to