This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 82de30dd89 Fix the potential access to upsert metadata manager after
it is closed (#11692)
82de30dd89 is described below
commit 82de30dd894a8495ded8a4ddbe240846241a5f3b
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Sep 27 09:16:23 2023 -0700
Fix the potential access to upsert metadata manager after it is closed
(#11692)
---
.../upsert/BasePartitionUpsertMetadataManager.java | 142 ++++++++++-----------
...rrentMapPartitionUpsertMetadataManagerTest.java | 58 ++++++++-
2 files changed, 122 insertions(+), 78 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index e968421537..c286bad125 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -80,10 +79,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
protected volatile boolean _gotFirstConsumingSegment = false;
protected final ReadWriteLock _snapshotLock;
- protected volatile boolean _stopped = false;
- // Initialize with 1 pending operation to indicate the metadata manager can
take more operations
- protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
-
protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
protected int _numOutOfOrderEvents = 0;
@@ -91,6 +86,12 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
// If upsertTTL enabled, we will keep track of largestSeenComparisonValue to
compute expired segments.
protected volatile double _largestSeenComparisonValue;
+ // The following variables are always accessed within synchronized block
+ private boolean _stopped;
+ // Initialize with 1 pending operation to indicate the metadata manager can
take more operations
+ private int _numPendingOperations = 1;
+ private boolean _closed;
+
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int
partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns,
@Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler
partialUpsertHandler, boolean enableSnapshot,
@@ -123,10 +124,6 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
@Override
public void addSegment(ImmutableSegment segment) {
String segmentName = segment.getSegmentName();
- if (_stopped) {
- _logger.info("Skip adding segment: {} because metadata manager is
already stopped", segment.getSegmentName());
- return;
- }
if (segment instanceof EmptyIndexSegment) {
_logger.info("Skip adding empty segment: {}", segmentName);
return;
@@ -158,18 +155,21 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
}
+ if (!startOperation()) {
+ _logger.info("Skip adding segment: {} because metadata manager is
already stopped", segment.getSegmentName());
+ return;
+ }
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
- startOperation();
try {
doAddSegment(immutableSegment);
_trackedSegments.add(segment);
} finally {
- finishOperation();
if (_enableSnapshot) {
_snapshotLock.readLock().unlock();
}
+ finishOperation();
}
}
@@ -219,24 +219,23 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
@Override
public void preloadSegment(ImmutableSegment segment) {
String segmentName = segment.getSegmentName();
- if (_stopped) {
- _logger.info("Skip preloading segment: {} because metadata manager is
already stopped", segmentName);
- return;
- }
Preconditions.checkArgument(_enableSnapshot, "Snapshot must be enabled to
preload segment: {}, table: {}",
segmentName, _tableNameWithType);
// Note that EmptyIndexSegment should not reach here either, as it doesn't
have validDocIds snapshot.
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: {} for segment: {}, table:
{}", segment.getClass(), segmentName,
_tableNameWithType);
+ if (!startOperation()) {
+ _logger.info("Skip preloading segment: {} because metadata manager is
already stopped", segmentName);
+ return;
+ }
_snapshotLock.readLock().lock();
- startOperation();
try {
doPreloadSegment((ImmutableSegmentImpl) segment);
_trackedSegments.add(segment);
} finally {
- finishOperation();
_snapshotLock.readLock().unlock();
+ finishOperation();
}
}
@@ -319,16 +318,14 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
@Override
public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
- if (_stopped) {
+ _gotFirstConsumingSegment = true;
+ if (!startOperation()) {
_logger.debug("Skip adding record to segment: {} because metadata
manager is already stopped",
segment.getSegmentName());
return;
}
-
// NOTE: We don't acquire snapshot read lock here because snapshot is
always taken before a new consuming segment
// starts consuming, so it won't overlap with this method
- _gotFirstConsumingSegment = true;
- startOperation();
try {
doAddRecord(segment, recordInfo);
_trackedSegments.add(segment);
@@ -341,15 +338,13 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
@Override
public void replaceSegment(ImmutableSegment segment, IndexSegment
oldSegment) {
- if (_stopped) {
+ if (!startOperation()) {
_logger.info("Skip replacing segment: {} because metadata manager is
already stopped", segment.getSegmentName());
return;
}
-
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
- startOperation();
try {
doReplaceSegment(segment, oldSegment);
if (!(segment instanceof EmptyIndexSegment)) {
@@ -357,10 +352,10 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
_trackedSegments.remove(oldSegment);
} finally {
- finishOperation();
if (_enableSnapshot) {
_snapshotLock.readLock().unlock();
}
+ finishOperation();
}
}
@@ -459,15 +454,10 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
@Override
public void removeSegment(IndexSegment segment) {
String segmentName = segment.getSegmentName();
- if (_stopped) {
- _logger.info("Skip removing segment: {} because metadata manager is
already stopped", segmentName);
- return;
- }
if (!_trackedSegments.contains(segment)) {
_logger.info("Skip removing untracked (replaced or empty) segment: {}",
segmentName);
return;
}
-
// Skip removing segment that has max comparison value smaller than
(largestSeenComparisonValue - TTL)
if (_largestSeenComparisonValue > 0) {
Number maxComparisonValue =
@@ -477,19 +467,21 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
return;
}
}
-
+ if (!startOperation()) {
+ _logger.info("Skip removing segment: {} because metadata manager is
already stopped", segmentName);
+ return;
+ }
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
- startOperation();
try {
doRemoveSegment(segment);
_trackedSegments.remove(segment);
} finally {
- finishOperation();
if (_enableSnapshot) {
_snapshotLock.readLock().unlock();
}
+ finishOperation();
}
}
@@ -530,12 +522,10 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
if (_partialUpsertHandler == null) {
return record;
}
- if (_stopped) {
+ if (!startOperation()) {
_logger.debug("Skip updating record because metadata manager is already
stopped");
return record;
}
-
- startOperation();
try {
return doUpdateRecord(record, recordInfo);
} finally {
@@ -566,22 +556,20 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
if (!_enableSnapshot) {
return;
}
- if (_stopped) {
- _logger.info("Skip taking snapshot because metadata manager is already
stopped");
- return;
- }
if (!_gotFirstConsumingSegment) {
_logger.info("Skip taking snapshot before getting the first consuming
segment");
return;
}
-
+ if (!startOperation()) {
+ _logger.info("Skip taking snapshot because metadata manager is already
stopped");
+ return;
+ }
_snapshotLock.writeLock().lock();
- startOperation();
try {
doTakeSnapshot();
} finally {
- finishOperation();
_snapshotLock.writeLock().unlock();
+ finishOperation();
}
}
@@ -597,8 +585,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
if (segment instanceof ImmutableSegmentImpl) {
((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot();
numImmutableSegments++;
- numPrimaryKeysInSnapshot +=
- ((ImmutableSegmentImpl)
segment).getValidDocIds().getMutableRoaringBitmap().getCardinality();
+ numPrimaryKeysInSnapshot +=
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
}
}
@@ -667,29 +654,15 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION
+ _partitionId);
}
- protected void startOperation() {
- _numPendingOperations.getAndIncrement();
- }
-
- protected void finishOperation() {
- if (_numPendingOperations.decrementAndGet() == 0) {
- synchronized (_numPendingOperations) {
- _numPendingOperations.notifyAll();
- }
- }
- }
-
@Override
public void removeExpiredPrimaryKeys() {
if (_metadataTTL <= 0) {
return;
}
- if (_stopped) {
+ if (!startOperation()) {
_logger.info("Skip removing expired primary keys because metadata
manager is already stopped");
return;
}
-
- startOperation();
try {
doRemoveExpiredPrimaryKeys();
} finally {
@@ -702,29 +675,50 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
*/
protected abstract void doRemoveExpiredPrimaryKeys();
+ protected synchronized boolean startOperation() {
+ if (_stopped || _numPendingOperations == 0) {
+ return false;
+ }
+ _numPendingOperations++;
+ return true;
+ }
+
+ protected synchronized void finishOperation() {
+ _numPendingOperations--;
+ if (_numPendingOperations == 0) {
+ notifyAll();
+ }
+ }
+
@Override
- public void stop() {
+ public synchronized void stop() {
+ if (_stopped) {
+ _logger.warn("Metadata manager is already stopped");
+ return;
+ }
_stopped = true;
- int numPendingOperations = _numPendingOperations.decrementAndGet();
+ _numPendingOperations--;
_logger.info("Stopped the metadata manager with {} pending operations,
current primary key count: {}",
- numPendingOperations, getNumPrimaryKeys());
+ _numPendingOperations, getNumPrimaryKeys());
}
@Override
- public void close()
+ public synchronized void close()
throws IOException {
Preconditions.checkState(_stopped, "Must stop the metadata manager before
closing it");
+ if (_closed) {
+ _logger.warn("Metadata manager is already closed");
+ return;
+ }
+ _closed = true;
_logger.info("Closing the metadata manager");
- synchronized (_numPendingOperations) {
- int numPendingOperations;
- while ((numPendingOperations = _numPendingOperations.get()) != 0) {
- _logger.info("Waiting for {} pending operations to finish",
numPendingOperations);
- try {
- _numPendingOperations.wait();
- } catch (InterruptedException e) {
- throw new RuntimeException(
- String.format("Interrupted while waiting for %d pending
operations to finish", numPendingOperations), e);
- }
+ while (_numPendingOperations != 0) {
+ _logger.info("Waiting for {} pending operations to finish",
_numPendingOperations);
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ String.format("Interrupted while waiting for %d pending operations
to finish", _numPendingOperations), e);
}
}
doClose();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 363ed18e13..3a1904300b 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -27,6 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -49,6 +52,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -59,10 +63,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertSame;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@@ -83,6 +84,55 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest
{
FileUtils.forceDelete(INDEX_DIR);
}
+ @Test
+ public void testStartFinishOperation() {
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE,
null, false, 0, INDEX_DIR,
+ mock(ServerMetrics.class));
+
+ // Start 2 operations
+ assertTrue(upsertMetadataManager.startOperation());
+ assertTrue(upsertMetadataManager.startOperation());
+
+ // Stop and close the metadata manager
+ AtomicBoolean stopped = new AtomicBoolean();
+ AtomicBoolean closed = new AtomicBoolean();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.submit(() -> {
+ upsertMetadataManager.stop();
+ stopped.set(true);
+ try {
+ upsertMetadataManager.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ closed.set(true);
+ });
+ executor.shutdown();
+
+ // Wait for metadata manager to be stopped
+ TestUtils.waitForCondition(aVoid -> stopped.get(), 10_000L, "Failed to
stop the metadata manager");
+
+ // Metadata manager should block on close because there are 2 pending
operations
+ assertFalse(closed.get());
+
+ // Starting new operation should fail because the metadata manager is
already stopped
+ assertFalse(upsertMetadataManager.startOperation());
+
+ // Finish one operation
+ upsertMetadataManager.finishOperation();
+
+ // Metadata manager should still block on close because there is still 1
pending operation
+ assertFalse(closed.get());
+
+ // Finish the other operation
+ upsertMetadataManager.finishOperation();
+
+ // Metadata manager should be closed now
+ TestUtils.waitForCondition(aVoid -> closed.get(), 10_000L, "Failed to
close the metadata manager");
+ }
+
@Test
public void testAddReplaceRemoveSegment()
throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]