This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 40b2a65287 Fix the double destroy of segment data manager during
server shutdown (#10475)
40b2a65287 is described below
commit 40b2a65287d5c7d8a3ed715e7f63438b06c13956
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Mar 24 16:07:15 2023 -0700
Fix the double destroy of segment data manager during server shutdown
(#10475)
---
.../offline/ImmutableSegmentDataManager.java | 2 +-
.../realtime/HLRealtimeSegmentDataManager.java | 2 +-
.../realtime/LLRealtimeSegmentDataManager.java | 2 +-
.../local/data/manager/SegmentDataManager.java | 14 +++++++-
.../RealtimeIndexOffHeapMemoryManager.java | 37 ++++++++++++----------
5 files changed, 37 insertions(+), 20 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
index 71c79b941b..1646ba77b9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
@@ -44,7 +44,7 @@ public class ImmutableSegmentDataManager extends
SegmentDataManager {
}
@Override
- public void destroy() {
+ protected void doDestroy() {
_immutableSegment.destroy();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 9f80ad8fe6..0afbb6d2ca 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -469,7 +469,7 @@ public class HLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
@Override
- public void destroy() {
+ protected void doDestroy() {
LOGGER.info("Trying to shutdown RealtimeSegmentDataManager : {}!",
_segmentName);
_isShuttingDown = true;
try {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index bed3a6a353..cc6b23243e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1238,7 +1238,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
return true;
}
- public void destroy() {
+ protected void doDestroy() {
try {
stop();
} catch (InterruptedException e) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
index 46933a8356..2e1dae8d1f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.data.manager;
import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -27,6 +28,7 @@ import org.apache.pinot.segment.spi.IndexSegment;
*/
public abstract class SegmentDataManager {
private final long _loadTimeMs = System.currentTimeMillis();
+ private final AtomicBoolean _destroyed = new AtomicBoolean();
private int _referenceCount = 1;
public long getLoadTimeMs() {
@@ -71,5 +73,15 @@ public abstract class SegmentDataManager {
public abstract IndexSegment getSegment();
- public abstract void destroy();
+ /**
+ * Destroys the data manager and releases all the resources allocated.
+ * The data manager can only be destroyed once.
+ */
+ public void destroy() {
+ if (_destroyed.compareAndSet(false, true)) {
+ doDestroy();
+ }
+ }
+
+ protected abstract void doDestroy();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
index 861fa953a1..737460593c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
@@ -20,8 +20,9 @@ package org.apache.pinot.segment.local.io.readerwriter;
import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.HLCSegmentName;
@@ -32,34 +33,37 @@ import
org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
/**
- * @class RealtimeIndexOffHeapMemoryManager is an abstract class that
implements base functionality to allocate and
- * release
+ * RealtimeIndexOffHeapMemoryManager is an abstract class that implements base
functionality to allocate and release
* memory that is acquired during realtime segment consumption.
*
- * Realtime consuming segments use memory for dictionary, forward index, and
inverted indices. For off-heap
- * allocation of memory, we instantiate one OffHeapMemoryManager for each
segment in the server,
+ * Realtime consuming segments use memory for dictionary, forward index, and
inverted indices. For off-heap allocation
+ * of memory, we instantiate one RealtimeIndexOffHeapMemoryManager for each
segment.
*
- * Closing the RealtimeOffHeapMemoryManager also releases all the resources
allocated by the OffHeapMemoryManager.
+ * Closing the RealtimeOffHeapMemoryManager also releases all the resources
allocated.
+ *
+ * This class is NOT thread safe. Only one thread should access this class.
*/
+@NotThreadSafe
public abstract class RealtimeIndexOffHeapMemoryManager implements
PinotDataBufferMemoryManager {
- private final List<PinotDataBuffer> _buffers = new LinkedList<>();
+ private final List<PinotDataBuffer> _buffers = new ArrayList<>();
+ private final String _rawTableName;
private final String _segmentName;
private final ServerMetrics _serverMetrics;
+
private long _totalAllocatedBytes = 0;
- private final String _tableName;
protected RealtimeIndexOffHeapMemoryManager(ServerMetrics serverMetrics,
String segmentName) {
_serverMetrics = serverMetrics;
_segmentName = segmentName;
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
if (llcSegmentName != null) {
- _tableName = llcSegmentName.getTableName();
+ _rawTableName = llcSegmentName.getTableName();
} else if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
- _tableName = hlcSegmentName.getTableName();
+ _rawTableName = hlcSegmentName.getTableName();
} else {
// For testing only
- _tableName = "NoSuchTable";
+ _rawTableName = "NoSuchTable";
}
}
@@ -82,7 +86,7 @@ public abstract class RealtimeIndexOffHeapMemoryManager
implements PinotDataBuff
_totalAllocatedBytes += size;
_buffers.add(buffer);
if (_serverMetrics != null) {
- _serverMetrics.addValueToTableGauge(_tableName,
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size);
+ _serverMetrics.addValueToTableGauge(_rawTableName,
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size);
}
return buffer;
}
@@ -97,9 +101,9 @@ public abstract class RealtimeIndexOffHeapMemoryManager
implements PinotDataBuff
/**
* Close out this memory manager and release all memory and resources.
- * This method must be called when all the memory allocated by this class is
not longer in use.
- * The application may choose to call (or not call) PinotDataBuffer.close(),
but this.close() MUST
- * be called to release all resources allocated.
+ * This method must be called when all the memory allocated by this class is
no longer in use.
+ * The application may choose to call (or not call) PinotDataBuffer.close(),
but this.close() MUST be called to
+ * release all resources allocated.
*
* @throws IOException
*/
@@ -109,7 +113,8 @@ public abstract class RealtimeIndexOffHeapMemoryManager
implements PinotDataBuff
buffer.close();
}
if (_serverMetrics != null) {
- _serverMetrics.addValueToTableGauge(_tableName,
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, -_totalAllocatedBytes);
+ _serverMetrics.addValueToTableGauge(_rawTableName,
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED,
+ -_totalAllocatedBytes);
}
doClose();
_buffers.clear();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]