klsince commented on code in PR #12886:
URL: https://github.com/apache/pinot/pull/12886#discussion_r1569585817
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -268,64 +282,192 @@ public void addSegment(ImmutableSegment
immutableSegment) {
String segmentName = immutableSegment.getSegmentName();
Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
segmentName, _tableNameWithType);
- _logger.info("Adding immutable segment: {} to table: {}", segmentName,
_tableNameWithType);
+ _logger.info("Adding immutable segment: {}", segmentName);
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.DOCUMENT_COUNT,
immutableSegment.getSegmentMetadata().getTotalDocs());
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1L);
ImmutableSegmentDataManager newSegmentManager = new
ImmutableSegmentDataManager(immutableSegment);
SegmentDataManager oldSegmentManager = registerSegment(segmentName,
newSegmentManager);
if (oldSegmentManager == null) {
- _logger.info("Added new immutable segment: {} to table: {}",
segmentName, _tableNameWithType);
+ _logger.info("Added new immutable segment: {}", segmentName);
} else {
- _logger.info("Replaced immutable segment: {} of table: {}", segmentName,
_tableNameWithType);
+ _logger.info("Replaced immutable segment: {}", segmentName);
releaseSegment(oldSegmentManager);
}
}
@Override
- public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
+ public void addOnlineSegment(String segmentName)
throws Exception {
- Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
- indexDir.getName(), _tableNameWithType);
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot add ONLINE segment:
%s to table: %s", segmentName,
+ _tableNameWithType);
+ _logger.info("Adding ONLINE segment: {} to table: {}", segmentName,
_tableNameWithType);
Review Comment:
as the other places, no need to log `_tableNameWithType`?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String
segmentName) {
}
}
- protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
- return true;
- }
-
- protected File downloadSegment(String segmentName, SegmentZKMetadata
zkMetadata)
- throws Exception {
- // TODO: may support download from peer servers for RealTime table.
- return downloadSegmentFromDeepStore(segmentName, zkMetadata);
- }
-
- private File downloadSegmentFromDeepStore(String segmentName,
SegmentZKMetadata zkMetadata)
+ /**
+ * Downloads an immutable segment into the index directory.
+ * Segment can be downloaded from deep store or from peer servers.
Downloaded segment might be compressed or
+ * encrypted, and this method takes care of decompressing and decrypting the
segment.
+ */
+ protected File downloadSegment(SegmentZKMetadata zkMetadata)
throws Exception {
- File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
- if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
- try {
- File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName,
zkMetadata, tempRootDir,
- _streamSegmentDownloadUntarRateLimitBytesPerSec);
- return moveSegment(segmentName, untaredSegDir);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
- } else {
- try {
- File tarFile = downloadAndDecrypt(segmentName, zkMetadata,
tempRootDir);
- return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
- }
- }
-
- private File moveSegment(String segmentName, File untaredSegDir)
- throws IOException {
+ String segmentName = zkMetadata.getSegmentName();
+ String downloadUrl = zkMetadata.getDownloadUrl();
+ Preconditions.checkState(downloadUrl != null,
+ "Failed to find download URL in ZK metadata for segment: %s of table:
%s", segmentName, _tableNameWithType);
try {
- File indexDir = getSegmentDataDir(segmentName);
- FileUtils.deleteDirectory(indexDir);
- FileUtils.moveDirectory(untaredSegDir, indexDir);
- return indexDir;
+ if
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) {
+ try {
+ return downloadSegmentFromDeepStore(zkMetadata);
+ } catch (Exception e) {
+ if (_peerDownloadScheme != null) {
Review Comment:
add a log that this is to retry the failure of downloading from deep store
nit: to save some indents
```
if (is peer downloand) {
return downloadSegmentFromPeers(zkMetadata);
}
...
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment
immutableSegment) {
_logger.info("Preloaded immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
return;
}
- // Replacing segment takes multiple steps, and particularly need to access
the oldSegment. Replace segment may
- // happen in two threads, i.e. the consuming thread that's committing the
mutable segment and a HelixTaskExecutor
- // thread that's bringing segment from ONLINE to CONSUMING when the server
finds consuming thread can't commit
- // the segment in time. The slower thread takes the reference of the
oldSegment here, but it may get closed by
- // the faster thread if not synchronized. In particular, the slower thread
may iterate the primary keys in the
- // oldSegment, causing seg fault. So we have to take a lock here.
- // However, we can't just reuse the existing segmentLocks. Because many
methods of partitionUpsertMetadataManager
- // takes this lock internally, but after taking snapshot RW lock. If we
take segmentLock here (before taking
- // snapshot RW lock), we can get into deadlock with threads calling
partitionUpsertMetadataManager's other
- // methods, like removeSegment.
- // Adding segment should be done by a single HelixTaskExecutor thread, but
do it with lock here for simplicity
- // otherwise, we'd need to double-check if oldSegmentManager is null.
- Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType,
segmentName);
- segmentLock.lock();
- try {
- SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
- if (oldSegmentManager == null) {
- // When adding a new segment, we should register it 'before' it is
fully initialized by
- // partitionUpsertMetadataManager. Because when processing docs in the
new segment, the docs in the other
- // segments may be invalidated, making the queries see less valid docs
than expected. We should let query
- // access the new segment asap even though its validDocId bitmap is
still being filled by
- // partitionUpsertMetadataManager.
- registerSegment(segmentName, newSegmentManager);
- partitionUpsertMetadataManager.addSegment(immutableSegment);
- _logger.info("Added new immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
- } else {
- // When replacing a segment, we should register the new segment
'after' it is fully initialized by
- // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
- // to the valid docs in the old segment immediately, but the
validDocId bitmap of the new segment is still
- // being filled by partitionUpsertMetadataManager, making the queries
see less valid docs than expected.
- // When replacing a segment, the new and old segments are assumed to
have same set of valid docs for data
- // consistency, otherwise the new segment should be named differently
to go through the addSegment flow above.
- IndexSegment oldSegment = oldSegmentManager.getSegment();
- partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
- registerSegment(segmentName, newSegmentManager);
- _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
- oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, _tableNameWithType);
- releaseSegment(oldSegmentManager);
- }
- } finally {
- segmentLock.unlock();
- }
- }
-
- @Override
- protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
- // Cannot download consuming segment
- if (zkMetadata.getStatus() == Status.IN_PROGRESS) {
- return false;
- }
- // TODO: may support download from peer servers as well.
- return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
- }
-
- void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata
segmentZKMetadata,
- IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
- String uri = segmentZKMetadata.getDownloadUrl();
- if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
- try {
- // TODO: cleanup and consolidate the segment loading logic a bit for
OFFLINE and REALTIME tables.
- // https://github.com/apache/pinot/issues/9752
- downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
- } catch (Exception e) {
- _logger.warn("Download segment {} from deepstore uri {} failed.",
segmentName, uri, e);
- // Download from deep store failed; try to download from peer if peer
download is setup for the table.
- if (_peerDownloadScheme != null) {
- downloadSegmentFromPeer(segmentName, indexLoadingConfig);
- } else {
- throw e;
- }
- }
+ SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
+ if (oldSegmentManager == null) {
+ // When adding a new segment, we should register it 'before' it is fully
initialized by
+ // partitionUpsertMetadataManager. Because when processing docs in the
new segment, the docs in the other
+ // segments may be invalidated, making the queries see less valid docs
than expected. We should let query
+ // access the new segment asap even though its validDocId bitmap is
still being filled by
+ // partitionUpsertMetadataManager.
+ registerSegment(segmentName, newSegmentManager);
+ partitionUpsertMetadataManager.addSegment(immutableSegment);
+ _logger.info("Added new immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
} else {
- if (_peerDownloadScheme != null) {
- downloadSegmentFromPeer(segmentName, indexLoadingConfig);
- } else {
- throw new RuntimeException("Peer segment download not enabled for
segment " + segmentName);
- }
- }
- }
-
- private void downloadSegmentFromDeepStore(String segmentName,
IndexLoadingConfig indexLoadingConfig, String uri) {
- // This could leave temporary directories in _indexDir if JVM shuts down
before the temp directory is deleted.
- // This is fine since the temporary directories are deleted when the table
data manager calls init.
- File tempRootDir = null;
- try {
- tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." +
System.currentTimeMillis());
- File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile);
- _logger.info("Downloaded file from {} to {}; Length of downloaded file:
{}", uri, segmentTarFile,
- segmentTarFile.length());
- untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile,
tempRootDir);
- } catch (Exception e) {
- _logger.warn("Failed to download segment {} from deep store: ",
segmentName, e);
- throw new RuntimeException(e);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
+ // When replacing a segment, we should register the new segment 'after'
it is fully initialized by
+ // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
+ // to the valid docs in the old segment immediately, but the validDocId
bitmap of the new segment is still
+ // being filled by partitionUpsertMetadataManager, making the queries
see less valid docs than expected.
+ // When replacing a segment, the new and old segments are assumed to
have same set of valid docs for data
+ // consistency, otherwise the new segment should be named differently to
go through the addSegment flow above.
+ IndexSegment oldSegment = oldSegmentManager.getSegment();
+ partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
+ registerSegment(segmentName, newSegmentManager);
+ _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, _tableNameWithType);
+ releaseSegment(oldSegmentManager);
}
}
/**
- * Untars the new segment and replaces the existing segment.
+ * Replaces the CONSUMING segment with a downloaded sealed one.
*/
- private void untarAndMoveSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, File segmentTarFile,
- File tempRootDir)
- throws IOException {
- File untarDir = new File(tempRootDir, segmentName);
- File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile,
untarDir).get(0);
- _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile,
untarDir);
- File indexDir = new File(_indexDir, segmentName);
- FileUtils.deleteQuietly(indexDir);
- FileUtils.moveDirectory(untaredSegDir, indexDir);
- _logger.info("Replacing LLC Segment {}", segmentName);
- replaceLLSegment(segmentName, indexLoadingConfig);
- }
-
- private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig
indexLoadingConfig) {
- File tempRootDir = null;
- try {
- tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." +
System.currentTimeMillis());
- File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- // Next download the segment from a randomly chosen server using
configured download scheme (http or https).
-
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
() -> {
- List<URI> peerServerURIs =
- PeerServerSegmentFinder.getPeerServerURIs(_helixManager,
_tableNameWithType, segmentName,
- _peerDownloadScheme);
- Collections.shuffle(peerServerURIs);
- return peerServerURIs;
- }, segmentTarFile);
- _logger.info("Fetched segment {} successfully to {} of size {}",
segmentName, segmentTarFile,
- segmentTarFile.length());
- untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile,
tempRootDir);
- } catch (Exception e) {
- _logger.warn("Download and move segment {} from peer with scheme {}
failed.", segmentName, _peerDownloadScheme,
- e);
- throw new RuntimeException(e);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
+ public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
+ throws Exception {
+ String segmentName = zkMetadata.getSegmentName();
+ _logger.info("Downloading and replacing CONSUMING segment: {} with sealed
one", segmentName);
+ File indexDir = downloadSegment(zkMetadata);
+ // Get a new index loading config with latest table config and schema to
load the segment
+ IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(null);
Review Comment:
why not pass zkMetadata into this method?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -428,17 +570,25 @@ public ExecutorService getSegmentPreloadExecutor() {
@Override
public void addSegmentError(String segmentName, SegmentErrorInfo
segmentErrorInfo) {
- _errorCache.put(Pair.of(_tableNameWithType, segmentName),
segmentErrorInfo);
+ if (_errorCache != null) {
+ _errorCache.put(Pair.of(_tableNameWithType, segmentName),
segmentErrorInfo);
+ }
}
@Override
public Map<String, SegmentErrorInfo> getSegmentErrors() {
- if (_errorCache == null) {
- return Collections.emptyMap();
+ if (_errorCache != null) {
+ // Filter out entries that match the table name
+ Map<String, SegmentErrorInfo> segmentErrors = new HashMap<>();
+ for (Map.Entry<Pair<String, String>, SegmentErrorInfo> entry :
_errorCache.asMap().entrySet()) {
+ Pair<String, String> tableSegmentPair = entry.getKey();
+ if (tableSegmentPair.getLeft().equals(_tableNameWithType)) {
+ segmentErrors.put(tableSegmentPair.getRight(), entry.getValue());
+ }
+ }
+ return segmentErrors;
} else {
- // Filter out entries that match the table name.
- return _errorCache.asMap().entrySet().stream().filter(map ->
map.getKey().getLeft().equals(_tableNameWithType))
- .collect(Collectors.toMap(map -> map.getKey().getRight(),
Map.Entry::getValue));
+ return Map.of();
Review Comment:
nit: why not continue to use Collections.emptyMap()? I didn't see use of
Map.of() else where in the repo, iirc it used to cause compile issue in older
JDK, 8?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -268,64 +282,192 @@ public void addSegment(ImmutableSegment
immutableSegment) {
String segmentName = immutableSegment.getSegmentName();
Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
segmentName, _tableNameWithType);
- _logger.info("Adding immutable segment: {} to table: {}", segmentName,
_tableNameWithType);
+ _logger.info("Adding immutable segment: {}", segmentName);
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.DOCUMENT_COUNT,
immutableSegment.getSegmentMetadata().getTotalDocs());
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1L);
ImmutableSegmentDataManager newSegmentManager = new
ImmutableSegmentDataManager(immutableSegment);
SegmentDataManager oldSegmentManager = registerSegment(segmentName,
newSegmentManager);
if (oldSegmentManager == null) {
- _logger.info("Added new immutable segment: {} to table: {}",
segmentName, _tableNameWithType);
+ _logger.info("Added new immutable segment: {}", segmentName);
} else {
- _logger.info("Replaced immutable segment: {} of table: {}", segmentName,
_tableNameWithType);
+ _logger.info("Replaced immutable segment: {}", segmentName);
releaseSegment(oldSegmentManager);
}
}
@Override
- public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
+ public void addOnlineSegment(String segmentName)
throws Exception {
- Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
- indexDir.getName(), _tableNameWithType);
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot add ONLINE segment:
%s to table: %s", segmentName,
+ _tableNameWithType);
+ _logger.info("Adding ONLINE segment: {} to table: {}", segmentName,
_tableNameWithType);
+ Lock segmentLock = getSegmentLock(segmentName);
+ segmentLock.lock();
+ try {
+ doAddOnlineSegment(segmentName);
+ } catch (Exception e) {
+ addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception
while adding ONLINE segment", e));
+ throw e;
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
+ protected abstract void doAddOnlineSegment(String segmentName)
+ throws Exception;
+
+ protected SegmentZKMetadata getZKMetadata(String segmentName) {
+ SegmentZKMetadata zkMetadata =
+ ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
_tableNameWithType, segmentName);
+ Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata
for segment: %s of table: %s", segmentName,
+ _tableNameWithType);
+ return zkMetadata;
+ }
+
+ protected TableConfig getTableConfig() {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", _tableNameWithType);
+ return tableConfig;
+ }
+
+ @Nullable
+ protected Schema getSchema(TableConfig tableConfig) {
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableConfig);
+ // NOTE: Schema is mandatory for REALTIME table.
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+ Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
+ }
+ return schema;
+ }
+
+ protected IndexLoadingConfig getIndexLoadingConfig(@Nullable
SegmentZKMetadata zkMetadata) {
+ TableConfig tableConfig = getTableConfig();
+ Schema schema = getSchema(tableConfig);
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
indexLoadingConfig.setTableDataDir(_tableDataDir);
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
indexLoadingConfig.getSchema()));
+ if (zkMetadata != null) {
+ indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
+ }
+ return indexLoadingConfig;
}
- @Override
- public void addSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata)
+ /**
+ * Adds a new ONLINE segment that is not already loaded.
+ */
+ protected void addNewOnlineSegment(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig)
+ throws Exception {
+ String segmentName = zkMetadata.getSegmentName();
+ _logger.info("Adding new ONLINE segment: {}", segmentName);
+ if (!tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata)) {
+ downloadAndLoadSegment(zkMetadata, indexLoadingConfig);
+ }
+ }
+
+ /**
+ * Replaces an already loaded segment in a table if the segment has been
overridden in the deep store (CRC mismatch).
+ */
+ protected void replaceSegmentIfCrcMismatch(SegmentDataManager
segmentDataManager, SegmentZKMetadata zkMetadata,
+ IndexLoadingConfig indexLoadingConfig)
throws Exception {
- throw new UnsupportedOperationException();
+ String segmentName = segmentDataManager.getSegmentName();
+ Preconditions.checkState(segmentDataManager instanceof
ImmutableSegmentDataManager,
+ "Cannot replace CONSUMING segment: %s in table: %s", segmentName,
_tableNameWithType);
+ SegmentMetadata localMetadata =
segmentDataManager.getSegment().getSegmentMetadata();
+ if (hasSameCRC(zkMetadata, localMetadata)) {
+ _logger.info("Segment: {} has CRC: {} same as before, not replacing it",
segmentName, localMetadata.getCrc());
+ return;
+ }
+ _logger.info("Replacing segment: {} because its CRC has changed from: {}
to: {}", segmentName,
+ localMetadata.getCrc(), zkMetadata.getCrc());
+ downloadAndLoadSegment(zkMetadata, indexLoadingConfig);
+ _logger.info("Replaced segment: {} with new CRC: {}", segmentName,
zkMetadata.getCrc());
}
/**
- * Called when we get a helix transition to go to offline or dropped state.
- * We need to remove it safely, keeping in mind that there may be queries
that are
- * using the segment,
- * @param segmentName name of the segment to remove.
+ * Downloads a segment and loads it into the table.
*/
+ protected void downloadAndLoadSegment(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig)
+ throws Exception {
+ String segmentName = zkMetadata.getSegmentName();
+ _logger.info("Downloading and loading segment: {}", segmentName);
+ File indexDir = downloadSegment(zkMetadata);
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig));
+ _logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}",
segmentName, zkMetadata.getCrc(),
+ TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
+ }
+
@Override
- public void removeSegment(String segmentName) {
- // Allow removing segment after shutdown so that we can remove the segment
when the table is deleted
+ public void replaceSegment(String segmentName)
+ throws Exception {
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot replace segment: %s
in table: %s", segmentName,
+ _tableNameWithType);
+ _logger.info("Replacing segment: {}", segmentName);
+ Lock segmentLock = getSegmentLock(segmentName);
+ segmentLock.lock();
+ try {
+ doReplaceSegment(segmentName);
+ } catch (Exception e) {
+ addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception
while replacing segment", e));
+ throw e;
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
+ protected void doReplaceSegment(String segmentName)
+ throws Exception {
+ SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
+ if (segmentDataManager != null) {
+ SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+ IndexLoadingConfig indexLoadingConfig =
getIndexLoadingConfig(zkMetadata);
+ replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata,
indexLoadingConfig);
+ } else {
+ _logger.warn("Failed to find segment: {}, skipping replacing it",
segmentName);
+ }
+ }
+
+ @Override
+ public void offloadSegment(String segmentName) {
+ // NOTE: Do not throw exception when data manager has been shut down. This
is regular flow when a table is deleted.
if (_shutDown) {
- _logger.info("Table data manager is already shut down, skip removing
segment: {} from table: {}", segmentName,
- _tableNameWithType);
+ _logger.info("Table data manager is already shut down, skipping
offloading segment: {}", segmentName);
return;
}
- _logger.info("Removing segment: {} from table: {}", segmentName,
_tableNameWithType);
+ _logger.info("Offloading segment: {}", segmentName);
+ Lock segmentLock = getSegmentLock(segmentName);
+ segmentLock.lock();
+ try {
+ doOffloadSegment(segmentName);
+ } catch (Exception e) {
+ addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception
while offloading segment", e));
+ throw e;
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
+ protected void doOffloadSegment(String segmentName) {
SegmentDataManager segmentDataManager = unregisterSegment(segmentName);
if (segmentDataManager != null) {
+ segmentDataManager.offload();
releaseSegment(segmentDataManager);
- _logger.info("Removed segment: {} from table: {}", segmentName,
_tableNameWithType);
+ _logger.info("Offloaded segment: {}", segmentName);
} else {
- _logger.info("Failed to find segment: {} in table: {}", segmentName,
_tableNameWithType);
+ _logger.warn("Failed to find segment: {}, skipping offloading it",
_tableNameWithType);
Review Comment:
s/_tableNameWithType/segmentName
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String
segmentName) {
}
}
- protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
- return true;
- }
-
- protected File downloadSegment(String segmentName, SegmentZKMetadata
zkMetadata)
- throws Exception {
- // TODO: may support download from peer servers for RealTime table.
- return downloadSegmentFromDeepStore(segmentName, zkMetadata);
- }
-
- private File downloadSegmentFromDeepStore(String segmentName,
SegmentZKMetadata zkMetadata)
+ /**
+ * Downloads an immutable segment into the index directory.
+ * Segment can be downloaded from deep store or from peer servers.
Downloaded segment might be compressed or
+ * encrypted, and this method takes care of decompressing and decrypting the
segment.
+ */
+ protected File downloadSegment(SegmentZKMetadata zkMetadata)
throws Exception {
- File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
- if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
- try {
- File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName,
zkMetadata, tempRootDir,
- _streamSegmentDownloadUntarRateLimitBytesPerSec);
- return moveSegment(segmentName, untaredSegDir);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
- } else {
- try {
- File tarFile = downloadAndDecrypt(segmentName, zkMetadata,
tempRootDir);
- return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
- }
- }
-
- private File moveSegment(String segmentName, File untaredSegDir)
- throws IOException {
+ String segmentName = zkMetadata.getSegmentName();
+ String downloadUrl = zkMetadata.getDownloadUrl();
+ Preconditions.checkState(downloadUrl != null,
+ "Failed to find download URL in ZK metadata for segment: %s of table:
%s", segmentName, _tableNameWithType);
try {
- File indexDir = getSegmentDataDir(segmentName);
- FileUtils.deleteDirectory(indexDir);
- FileUtils.moveDirectory(untaredSegDir, indexDir);
- return indexDir;
+ if
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) {
+ try {
+ return downloadSegmentFromDeepStore(zkMetadata);
+ } catch (Exception e) {
+ if (_peerDownloadScheme != null) {
+ return downloadSegmentFromPeers(zkMetadata);
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ return downloadSegmentFromPeers(zkMetadata);
+ }
} catch (Exception e) {
- LOGGER.error("Failed to move segment: {} of table: {}", segmentName,
_tableNameWithType);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1);
throw e;
}
}
@VisibleForTesting
- File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata,
File tempRootDir)
+ File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
throws Exception {
- File tarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- String uri = zkMetadata.getDownloadUrl();
- boolean downloadSuccess = false;
+ String segmentName = zkMetadata.getSegmentName();
+ String downloadUrl = zkMetadata.getDownloadUrl();
+ _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl);
+ File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
+ if (_segmentDownloadSemaphore != null) {
+ long startTime = System.currentTimeMillis();
+ _logger.info("Acquiring segment download semaphore for segment: {},
queue-length: {} ", segmentName,
+ _segmentDownloadSemaphore.getQueueLength());
+ _segmentDownloadSemaphore.acquire();
+ _logger.info("Acquired segment download semaphore for segment: {}
(lock-time={}ms, queue-length={}).",
+ segmentName, System.currentTimeMillis() - startTime,
_segmentDownloadSemaphore.getQueueLength());
+ }
try {
- if (_segmentDownloadSemaphore != null) {
- long startTime = System.currentTimeMillis();
- LOGGER.info("Trying to acquire segment download semaphore for: {}.
queue-length: {} ", segmentName,
- _segmentDownloadSemaphore.getQueueLength());
- _segmentDownloadSemaphore.acquire();
- LOGGER.info("Acquired segment download semaphore for: {}
(lock-time={}ms, queue-length={}).", segmentName,
- System.currentTimeMillis() - startTime,
_segmentDownloadSemaphore.getQueueLength());
- }
- SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile,
zkMetadata.getCrypterName());
- LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to:
{}, file length: {}", segmentName,
- _tableNameWithType, uri, tarFile, tarFile.length());
- downloadSuccess = true;
- return tarFile;
- } catch (AttemptsExceededException e) {
- LOGGER.error("Attempts exceeded when downloading segment: {} for table:
{} from: {} to: {}", segmentName,
- _tableNameWithType, uri, tarFile);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L);
- if (_peerDownloadScheme == null) {
- throw e;
+ File untarredSegmentDir;
+ if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() ==
null) {
+ _logger.info("Downloading segment: {} using streamed download-untar
with maxStreamRateInByte: {}", segmentName,
+ _streamSegmentDownloadUntarRateLimitBytesPerSec);
+ AtomicInteger attempts = new AtomicInteger(0);
+ try {
+ untarredSegmentDir =
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
+ _streamSegmentDownloadUntarRateLimitBytesPerSec, attempts);
+ _logger.info("Downloaded and untarred segment: {} from: {},
attempts: {}", segmentName, downloadUrl,
+ attempts.get());
+ } finally {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
Review Comment:
not always failures?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -608,172 +721,153 @@ protected SegmentDataManager unregisterSegment(String
segmentName) {
}
}
- protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
- return true;
- }
-
- protected File downloadSegment(String segmentName, SegmentZKMetadata
zkMetadata)
- throws Exception {
- // TODO: may support download from peer servers for RealTime table.
- return downloadSegmentFromDeepStore(segmentName, zkMetadata);
- }
-
- private File downloadSegmentFromDeepStore(String segmentName,
SegmentZKMetadata zkMetadata)
+ /**
+ * Downloads an immutable segment into the index directory.
+ * Segment can be downloaded from deep store or from peer servers.
Downloaded segment might be compressed or
+ * encrypted, and this method takes care of decompressing and decrypting the
segment.
+ */
+ protected File downloadSegment(SegmentZKMetadata zkMetadata)
throws Exception {
- File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
- if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
- try {
- File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName,
zkMetadata, tempRootDir,
- _streamSegmentDownloadUntarRateLimitBytesPerSec);
- return moveSegment(segmentName, untaredSegDir);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
- } else {
- try {
- File tarFile = downloadAndDecrypt(segmentName, zkMetadata,
tempRootDir);
- return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
- }
- }
-
- private File moveSegment(String segmentName, File untaredSegDir)
- throws IOException {
+ String segmentName = zkMetadata.getSegmentName();
+ String downloadUrl = zkMetadata.getDownloadUrl();
+ Preconditions.checkState(downloadUrl != null,
+ "Failed to find download URL in ZK metadata for segment: %s of table:
%s", segmentName, _tableNameWithType);
try {
- File indexDir = getSegmentDataDir(segmentName);
- FileUtils.deleteDirectory(indexDir);
- FileUtils.moveDirectory(untaredSegDir, indexDir);
- return indexDir;
+ if
(!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(downloadUrl)) {
+ try {
+ return downloadSegmentFromDeepStore(zkMetadata);
+ } catch (Exception e) {
+ if (_peerDownloadScheme != null) {
+ return downloadSegmentFromPeers(zkMetadata);
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ return downloadSegmentFromPeers(zkMetadata);
+ }
} catch (Exception e) {
- LOGGER.error("Failed to move segment: {} of table: {}", segmentName,
_tableNameWithType);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DIR_MOVEMENT_FAILURES, 1L);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1);
throw e;
}
}
@VisibleForTesting
- File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata,
File tempRootDir)
+ File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
throws Exception {
- File tarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- String uri = zkMetadata.getDownloadUrl();
- boolean downloadSuccess = false;
+ String segmentName = zkMetadata.getSegmentName();
+ String downloadUrl = zkMetadata.getDownloadUrl();
+ _logger.info("Downloading segment: {} from: {}", segmentName, downloadUrl);
+ File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
+ if (_segmentDownloadSemaphore != null) {
+ long startTime = System.currentTimeMillis();
+ _logger.info("Acquiring segment download semaphore for segment: {},
queue-length: {} ", segmentName,
+ _segmentDownloadSemaphore.getQueueLength());
+ _segmentDownloadSemaphore.acquire();
+ _logger.info("Acquired segment download semaphore for segment: {}
(lock-time={}ms, queue-length={}).",
+ segmentName, System.currentTimeMillis() - startTime,
_segmentDownloadSemaphore.getQueueLength());
+ }
try {
- if (_segmentDownloadSemaphore != null) {
- long startTime = System.currentTimeMillis();
- LOGGER.info("Trying to acquire segment download semaphore for: {}.
queue-length: {} ", segmentName,
- _segmentDownloadSemaphore.getQueueLength());
- _segmentDownloadSemaphore.acquire();
- LOGGER.info("Acquired segment download semaphore for: {}
(lock-time={}ms, queue-length={}).", segmentName,
- System.currentTimeMillis() - startTime,
_segmentDownloadSemaphore.getQueueLength());
- }
- SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile,
zkMetadata.getCrypterName());
- LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to:
{}, file length: {}", segmentName,
- _tableNameWithType, uri, tarFile, tarFile.length());
- downloadSuccess = true;
- return tarFile;
- } catch (AttemptsExceededException e) {
- LOGGER.error("Attempts exceeded when downloading segment: {} for table:
{} from: {} to: {}", segmentName,
- _tableNameWithType, uri, tarFile);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L);
- if (_peerDownloadScheme == null) {
- throw e;
+ File untarredSegmentDir;
+ if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() ==
null) {
+ _logger.info("Downloading segment: {} using streamed download-untar
with maxStreamRateInByte: {}", segmentName,
+ _streamSegmentDownloadUntarRateLimitBytesPerSec);
+ AtomicInteger attempts = new AtomicInteger(0);
+ try {
+ untarredSegmentDir =
SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, tempRootDir,
+ _streamSegmentDownloadUntarRateLimitBytesPerSec, attempts);
+ _logger.info("Downloaded and untarred segment: {} from: {},
attempts: {}", segmentName, downloadUrl,
+ attempts.get());
+ } finally {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
+ attempts.get());
+ }
+ } else {
+ File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl,
segmentTarFile, zkMetadata.getCrypterName());
+ _logger.info("Downloaded tarred segment: {} from: {} to: {}, file
length: {}", segmentName, downloadUrl,
+ segmentTarFile, segmentTarFile.length());
+ untarredSegmentDir = untarSegment(segmentName, segmentTarFile,
tempRootDir);
}
- downloadFromPeersWithoutStreaming(segmentName, zkMetadata, tarFile);
- downloadSuccess = true;
- return tarFile;
+ File indexDir = moveSegment(segmentName, untarredSegmentDir);
+ _logger.info("Downloaded segment: {} from: {} to: {}", segmentName,
downloadUrl, indexDir);
+ return indexDir;
+ } catch (Exception e) {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1);
+ throw e;
} finally {
- if (!downloadSuccess) {
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
- }
if (_segmentDownloadSemaphore != null) {
_segmentDownloadSemaphore.release();
}
+ FileUtils.deleteQuietly(tempRootDir);
}
}
- protected void downloadFromPeersWithoutStreaming(String segmentName,
SegmentZKMetadata zkMetadata, File destTarFile)
+ @VisibleForTesting
+ File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata)
throws Exception {
+ String segmentName = zkMetadata.getSegmentName();
Preconditions.checkState(_peerDownloadScheme != null, "Peer download is
not enabled for table: %s",
_tableNameWithType);
+ _logger.info("Downloading segment: {} from peers", segmentName);
+ File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
Review Comment:
pull `File tmepRootDir...` and `FileUtils.deleteQuietly(tempRootDir);` up
into the caller `downloadSegment()` to save those few lines from
downloadeFromDeepstore and xxxfromPeers
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -752,14 +752,20 @@ public void run() {
_state = State.DISCARDED;
break;
case DEFAULT:
- success = buildSegmentAndReplace();
- if (success) {
- _state = State.RETAINED;
- } else {
- // Could not build segment for some reason. We can only
download it.
- _state = State.ERROR;
- _segmentLogger.error("Could not build segment for {}",
_segmentNameStr);
+ // Lock the segment to avoid multiple threads touching the
same segment.
+ Lock segmentLock =
_realtimeTableDataManager.getSegmentLock(_segmentNameStr);
+ segmentLock.lock();
+ try {
+ if (buildSegmentAndReplace()) {
Review Comment:
move lock/unlock into buildSegmentAndReplace() method?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment
immutableSegment) {
_logger.info("Preloaded immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
return;
}
- // Replacing segment takes multiple steps, and particularly need to access
the oldSegment. Replace segment may
- // happen in two threads, i.e. the consuming thread that's committing the
mutable segment and a HelixTaskExecutor
- // thread that's bringing segment from ONLINE to CONSUMING when the server
finds consuming thread can't commit
- // the segment in time. The slower thread takes the reference of the
oldSegment here, but it may get closed by
- // the faster thread if not synchronized. In particular, the slower thread
may iterate the primary keys in the
- // oldSegment, causing seg fault. So we have to take a lock here.
- // However, we can't just reuse the existing segmentLocks. Because many
methods of partitionUpsertMetadataManager
- // takes this lock internally, but after taking snapshot RW lock. If we
take segmentLock here (before taking
- // snapshot RW lock), we can get into deadlock with threads calling
partitionUpsertMetadataManager's other
- // methods, like removeSegment.
- // Adding segment should be done by a single HelixTaskExecutor thread, but
do it with lock here for simplicity
- // otherwise, we'd need to double-check if oldSegmentManager is null.
- Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType,
segmentName);
- segmentLock.lock();
- try {
- SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
- if (oldSegmentManager == null) {
- // When adding a new segment, we should register it 'before' it is
fully initialized by
- // partitionUpsertMetadataManager. Because when processing docs in the
new segment, the docs in the other
- // segments may be invalidated, making the queries see less valid docs
than expected. We should let query
- // access the new segment asap even though its validDocId bitmap is
still being filled by
- // partitionUpsertMetadataManager.
- registerSegment(segmentName, newSegmentManager);
- partitionUpsertMetadataManager.addSegment(immutableSegment);
- _logger.info("Added new immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
- } else {
- // When replacing a segment, we should register the new segment
'after' it is fully initialized by
- // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
- // to the valid docs in the old segment immediately, but the
validDocId bitmap of the new segment is still
- // being filled by partitionUpsertMetadataManager, making the queries
see less valid docs than expected.
- // When replacing a segment, the new and old segments are assumed to
have same set of valid docs for data
- // consistency, otherwise the new segment should be named differently
to go through the addSegment flow above.
- IndexSegment oldSegment = oldSegmentManager.getSegment();
- partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
- registerSegment(segmentName, newSegmentManager);
- _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
- oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, _tableNameWithType);
- releaseSegment(oldSegmentManager);
- }
- } finally {
- segmentLock.unlock();
- }
- }
-
- @Override
- protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
- // Cannot download consuming segment
- if (zkMetadata.getStatus() == Status.IN_PROGRESS) {
- return false;
- }
- // TODO: may support download from peer servers as well.
- return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
- }
-
- void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata
segmentZKMetadata,
- IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
- String uri = segmentZKMetadata.getDownloadUrl();
- if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
- try {
- // TODO: cleanup and consolidate the segment loading logic a bit for
OFFLINE and REALTIME tables.
- // https://github.com/apache/pinot/issues/9752
- downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
- } catch (Exception e) {
- _logger.warn("Download segment {} from deepstore uri {} failed.",
segmentName, uri, e);
- // Download from deep store failed; try to download from peer if peer
download is setup for the table.
- if (_peerDownloadScheme != null) {
- downloadSegmentFromPeer(segmentName, indexLoadingConfig);
- } else {
- throw e;
- }
- }
+ SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
+ if (oldSegmentManager == null) {
+ // When adding a new segment, we should register it 'before' it is fully
initialized by
+ // partitionUpsertMetadataManager. Because when processing docs in the
new segment, the docs in the other
+ // segments may be invalidated, making the queries see less valid docs
than expected. We should let query
+ // access the new segment asap even though its validDocId bitmap is
still being filled by
+ // partitionUpsertMetadataManager.
+ registerSegment(segmentName, newSegmentManager);
+ partitionUpsertMetadataManager.addSegment(immutableSegment);
+ _logger.info("Added new immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
} else {
- if (_peerDownloadScheme != null) {
- downloadSegmentFromPeer(segmentName, indexLoadingConfig);
- } else {
- throw new RuntimeException("Peer segment download not enabled for
segment " + segmentName);
- }
- }
- }
-
- private void downloadSegmentFromDeepStore(String segmentName,
IndexLoadingConfig indexLoadingConfig, String uri) {
- // This could leave temporary directories in _indexDir if JVM shuts down
before the temp directory is deleted.
- // This is fine since the temporary directories are deleted when the table
data manager calls init.
- File tempRootDir = null;
- try {
- tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." +
System.currentTimeMillis());
- File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile);
- _logger.info("Downloaded file from {} to {}; Length of downloaded file:
{}", uri, segmentTarFile,
- segmentTarFile.length());
- untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile,
tempRootDir);
- } catch (Exception e) {
- _logger.warn("Failed to download segment {} from deep store: ",
segmentName, e);
- throw new RuntimeException(e);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
+ // When replacing a segment, we should register the new segment 'after'
it is fully initialized by
+ // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
+ // to the valid docs in the old segment immediately, but the validDocId
bitmap of the new segment is still
+ // being filled by partitionUpsertMetadataManager, making the queries
see less valid docs than expected.
+ // When replacing a segment, the new and old segments are assumed to
have same set of valid docs for data
+ // consistency, otherwise the new segment should be named differently to
go through the addSegment flow above.
+ IndexSegment oldSegment = oldSegmentManager.getSegment();
+ partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
+ registerSegment(segmentName, newSegmentManager);
+ _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, _tableNameWithType);
+ releaseSegment(oldSegmentManager);
}
}
/**
- * Untars the new segment and replaces the existing segment.
+ * Replaces the CONSUMING segment with a downloaded sealed one.
*/
- private void untarAndMoveSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, File segmentTarFile,
- File tempRootDir)
- throws IOException {
- File untarDir = new File(tempRootDir, segmentName);
- File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile,
untarDir).get(0);
- _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile,
untarDir);
- File indexDir = new File(_indexDir, segmentName);
- FileUtils.deleteQuietly(indexDir);
- FileUtils.moveDirectory(untaredSegDir, indexDir);
- _logger.info("Replacing LLC Segment {}", segmentName);
- replaceLLSegment(segmentName, indexLoadingConfig);
- }
-
- private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig
indexLoadingConfig) {
- File tempRootDir = null;
- try {
- tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." +
System.currentTimeMillis());
- File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- // Next download the segment from a randomly chosen server using
configured download scheme (http or https).
-
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
() -> {
- List<URI> peerServerURIs =
- PeerServerSegmentFinder.getPeerServerURIs(_helixManager,
_tableNameWithType, segmentName,
- _peerDownloadScheme);
- Collections.shuffle(peerServerURIs);
- return peerServerURIs;
- }, segmentTarFile);
- _logger.info("Fetched segment {} successfully to {} of size {}",
segmentName, segmentTarFile,
- segmentTarFile.length());
- untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile,
tempRootDir);
- } catch (Exception e) {
- _logger.warn("Download and move segment {} from peer with scheme {}
failed.", segmentName, _peerDownloadScheme,
- e);
- throw new RuntimeException(e);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
+ public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
+ throws Exception {
+ String segmentName = zkMetadata.getSegmentName();
+ _logger.info("Downloading and replacing CONSUMING segment: {} with sealed
one", segmentName);
+ File indexDir = downloadSegment(zkMetadata);
+ // Get a new index loading config with latest table config and schema to
load the segment
+ IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(null);
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig));
+ _logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName);
}
/**
- * Replaces a committed LLC REALTIME segment.
+ * Replaces the CONSUMING segment with the sealed one.
Review Comment:
nit:`... with the one sealed locally` to be more precise?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -377,111 +369,112 @@ private boolean isUpsertPreloadEnabled() {
&& upsertConfig.isEnableSnapshot() && upsertConfig.isEnablePreload();
}
- /*
- * This call comes in one of two ways:
- * For HL Segments:
- * - We are being directed by helix to own up all the segments that we
committed and are still in retention. In
- * this case we treat it exactly like how OfflineTableDataManager would --
wrap it into an
- * OfflineSegmentDataManager, and put it in the map.
- * - We are being asked to own up a new realtime segment. In this case, we
wrap the segment with a
- * RealTimeSegmentDataManager (that kicks off consumption). When the segment
is committed we get notified via the
- * notifySegmentCommitted call, at which time we replace the segment with
the OfflineSegmentDataManager
- *
- * For LL Segments:
- * - We are being asked to start consuming from a partition.
- * - We did not know about the segment and are being asked to download and
own the segment (re-balancing, or
- * replacing a realtime server with a fresh one, maybe). We need to look
at segment metadata and decide whether
- * to start consuming or download the segment.
+ /**
+ * Handles upsert preload, and returns whether the upsert preload is enabled.
*/
- @Override
- public void addSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata segmentZKMetadata)
- throws Exception {
- Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
- segmentName, _tableNameWithType);
- boolean upsertPreloadEnabled = isUpsertPreloadEnabled();
- if (upsertPreloadEnabled) {
- Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata,
null);
- Preconditions.checkNotNull(partitionId,
- String.format("Failed to get partition id for segment: %s
(upsert-enabled table: %s)", segmentName,
- _tableNameWithType));
- PartitionUpsertMetadataManager partitionUpsertMetadataManager =
- _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
- partitionUpsertMetadataManager.preloadSegments(indexLoadingConfig);
- // Continue to add segment after preloading, as the segment might not be
added by preloading.
+ private void handleUpsertPreload(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
+ if (!isUpsertPreloadEnabled()) {
+ return;
}
+ String segmentName = zkMetadata.getSegmentName();
+ Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null);
+ Preconditions.checkState(partitionId != null,
+ String.format("Failed to get partition id for segment: %s in
upsert-enabled table: %s", segmentName,
+ _tableNameWithType));
+
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig);
+ }
+
+ protected void doAddOnlineSegment(String segmentName)
+ throws Exception {
+ SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+ Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
+ "Segment: %s of table: %s is not committed, cannot make it ONLINE",
segmentName, _tableNameWithType);
+ IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata);
+ handleUpsertPreload(zkMetadata, indexLoadingConfig);
SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
- if (segmentDataManager != null) {
- if (upsertPreloadEnabled) {
- _logger.debug(
- "Skipping adding existing segment: {} for table: {} with data
manager class: {}, as it's preloaded",
- segmentName, _tableNameWithType,
segmentDataManager.getClass().getSimpleName());
+ if (segmentDataManager == null) {
+ addNewOnlineSegment(zkMetadata, indexLoadingConfig);
+ } else {
+ if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+ _logger.info("Changing segment: {} from CONSUMING to ONLINE",
segmentName);
+ ((RealtimeSegmentDataManager)
segmentDataManager).goOnlineFromConsuming(zkMetadata);
+ onConsumingToOnline(segmentName);
} else {
- _logger.warn("Skipping adding existing segment: {} for table: {} with
data manager class: {}", segmentName,
- _tableNameWithType, segmentDataManager.getClass().getSimpleName());
+ replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata,
indexLoadingConfig);
}
+ }
+ }
+
+ @Override
+ public void addConsumingSegment(String segmentName) {
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot add CONSUMING
segment: %s to table: %s", segmentName,
+ _tableNameWithType);
+ _logger.info("Adding CONSUMING segment: {}", segmentName);
+ Lock segmentLock = getSegmentLock(segmentName);
+ segmentLock.lock();
+ try {
+ doAddConsumingSegment(segmentName);
+ } catch (Exception e) {
+ addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception
while adding CONSUMING segment", e));
+ throw e;
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
+ private void doAddConsumingSegment(String segmentName) {
+ SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+ // NOTE: We do not throw exception here because the segment might have
just been committed before the state
+ // transition is processed. We can skip adding this segment, and
rely on the following CONSUMING -> ONLINE
+ // state transition to add it.
Review Comment:
how about add this to the comment that: We can skip ..., and the segment
will enter 'CONSUMING' state, and we can rely on ...
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java:
##########
@@ -76,22 +79,52 @@ void init(InstanceDataManagerConfig
instanceDataManagerConfig, TableConfig table
boolean isShutDown();
+ /**
+ * Returns the segment lock for a segment in the table.
+ */
+ Lock getSegmentLock(String segmentName);
+
+ /**
+ * Returns whether the segment is loaded in the table.
+ */
+ boolean hasSegment(String segmentName);
+
/**
* Adds a loaded immutable segment into the table.
*/
+ @VisibleForTesting
void addSegment(ImmutableSegment immutableSegment);
Review Comment:
how about move this into the base class as it's more of an internal helper
method now
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -377,111 +369,112 @@ private boolean isUpsertPreloadEnabled() {
&& upsertConfig.isEnableSnapshot() && upsertConfig.isEnablePreload();
}
- /*
- * This call comes in one of two ways:
- * For HL Segments:
- * - We are being directed by helix to own up all the segments that we
committed and are still in retention. In
- * this case we treat it exactly like how OfflineTableDataManager would --
wrap it into an
- * OfflineSegmentDataManager, and put it in the map.
- * - We are being asked to own up a new realtime segment. In this case, we
wrap the segment with a
- * RealTimeSegmentDataManager (that kicks off consumption). When the segment
is committed we get notified via the
- * notifySegmentCommitted call, at which time we replace the segment with
the OfflineSegmentDataManager
- *
- * For LL Segments:
- * - We are being asked to start consuming from a partition.
- * - We did not know about the segment and are being asked to download and
own the segment (re-balancing, or
- * replacing a realtime server with a fresh one, maybe). We need to look
at segment metadata and decide whether
- * to start consuming or download the segment.
+ /**
+ * Handles upsert preload, and returns whether the upsert preload is enabled.
*/
- @Override
- public void addSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata segmentZKMetadata)
- throws Exception {
- Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
- segmentName, _tableNameWithType);
- boolean upsertPreloadEnabled = isUpsertPreloadEnabled();
- if (upsertPreloadEnabled) {
- Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata,
null);
- Preconditions.checkNotNull(partitionId,
- String.format("Failed to get partition id for segment: %s
(upsert-enabled table: %s)", segmentName,
- _tableNameWithType));
- PartitionUpsertMetadataManager partitionUpsertMetadataManager =
- _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
- partitionUpsertMetadataManager.preloadSegments(indexLoadingConfig);
- // Continue to add segment after preloading, as the segment might not be
added by preloading.
+ private void handleUpsertPreload(SegmentZKMetadata zkMetadata,
IndexLoadingConfig indexLoadingConfig) {
+ if (!isUpsertPreloadEnabled()) {
+ return;
}
+ String segmentName = zkMetadata.getSegmentName();
+ Integer partitionId =
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null);
+ Preconditions.checkState(partitionId != null,
+ String.format("Failed to get partition id for segment: %s in
upsert-enabled table: %s", segmentName,
+ _tableNameWithType));
+
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig);
+ }
+
+ protected void doAddOnlineSegment(String segmentName)
+ throws Exception {
+ SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+ Preconditions.checkState(zkMetadata.getStatus() != Status.IN_PROGRESS,
+ "Segment: %s of table: %s is not committed, cannot make it ONLINE",
segmentName, _tableNameWithType);
+ IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata);
+ handleUpsertPreload(zkMetadata, indexLoadingConfig);
SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
- if (segmentDataManager != null) {
- if (upsertPreloadEnabled) {
- _logger.debug(
- "Skipping adding existing segment: {} for table: {} with data
manager class: {}, as it's preloaded",
- segmentName, _tableNameWithType,
segmentDataManager.getClass().getSimpleName());
+ if (segmentDataManager == null) {
+ addNewOnlineSegment(zkMetadata, indexLoadingConfig);
+ } else {
+ if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+ _logger.info("Changing segment: {} from CONSUMING to ONLINE",
segmentName);
+ ((RealtimeSegmentDataManager)
segmentDataManager).goOnlineFromConsuming(zkMetadata);
+ onConsumingToOnline(segmentName);
} else {
- _logger.warn("Skipping adding existing segment: {} for table: {} with
data manager class: {}", segmentName,
- _tableNameWithType, segmentDataManager.getClass().getSimpleName());
+ replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata,
indexLoadingConfig);
}
+ }
+ }
+
+ @Override
+ public void addConsumingSegment(String segmentName) {
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot add CONSUMING
segment: %s to table: %s", segmentName,
+ _tableNameWithType);
+ _logger.info("Adding CONSUMING segment: {}", segmentName);
+ Lock segmentLock = getSegmentLock(segmentName);
+ segmentLock.lock();
+ try {
+ doAddConsumingSegment(segmentName);
+ } catch (Exception e) {
+ addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception
while adding CONSUMING segment", e));
+ throw e;
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
+ private void doAddConsumingSegment(String segmentName) {
+ SegmentZKMetadata zkMetadata = getZKMetadata(segmentName);
+ // NOTE: We do not throw exception here because the segment might have
just been committed before the state
+ // transition is processed. We can skip adding this segment, and
rely on the following CONSUMING -> ONLINE
+ // state transition to add it.
+ if (zkMetadata.getStatus() != Status.IN_PROGRESS) {
+ _logger.warn("Segment: {} is already committed, skipping adding it as
CONSUMING segment", segmentName);
+ return;
+ }
+ IndexLoadingConfig indexLoadingConfig = getIndexLoadingConfig(zkMetadata);
+ handleUpsertPreload(zkMetadata, indexLoadingConfig);
+ SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
+ if (segmentDataManager != null) {
+ _logger.warn("Segment: {} already exists, skipping adding it as
CONSUMING segment", segmentName);
Review Comment:
nit: check if segmentDataManager is mutable/immutable segment and log it for
debug
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java:
##########
@@ -67,6 +67,8 @@ public interface PartitionUpsertMetadataManager extends
Closeable {
/**
* Preload segments for the table partition. Segments can be added
differently during preloading.
+ * TODO: Revisit this and see if we can use the same IndexLoadingConfig for
all segments. Tier info might be different
+ * for different segments.
Review Comment:
good catch, I think we could fix it like below:
```
void doPreloadSegmentWithSnapshot(...) {
// need to make a copy of indexLoadingConfig;
setSegmentTier(segmentZKMetadata.getTier());
// pass the copy of indexLoadingConfig to method below
tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig,
segmentZKMetadata);
}
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -570,165 +563,57 @@ private void handleUpsert(ImmutableSegment
immutableSegment) {
_logger.info("Preloaded immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
return;
}
- // Replacing segment takes multiple steps, and particularly need to access
the oldSegment. Replace segment may
- // happen in two threads, i.e. the consuming thread that's committing the
mutable segment and a HelixTaskExecutor
- // thread that's bringing segment from ONLINE to CONSUMING when the server
finds consuming thread can't commit
- // the segment in time. The slower thread takes the reference of the
oldSegment here, but it may get closed by
- // the faster thread if not synchronized. In particular, the slower thread
may iterate the primary keys in the
- // oldSegment, causing seg fault. So we have to take a lock here.
- // However, we can't just reuse the existing segmentLocks. Because many
methods of partitionUpsertMetadataManager
- // takes this lock internally, but after taking snapshot RW lock. If we
take segmentLock here (before taking
- // snapshot RW lock), we can get into deadlock with threads calling
partitionUpsertMetadataManager's other
- // methods, like removeSegment.
- // Adding segment should be done by a single HelixTaskExecutor thread, but
do it with lock here for simplicity
- // otherwise, we'd need to double-check if oldSegmentManager is null.
- Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType,
segmentName);
- segmentLock.lock();
- try {
- SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
- if (oldSegmentManager == null) {
- // When adding a new segment, we should register it 'before' it is
fully initialized by
- // partitionUpsertMetadataManager. Because when processing docs in the
new segment, the docs in the other
- // segments may be invalidated, making the queries see less valid docs
than expected. We should let query
- // access the new segment asap even though its validDocId bitmap is
still being filled by
- // partitionUpsertMetadataManager.
- registerSegment(segmentName, newSegmentManager);
- partitionUpsertMetadataManager.addSegment(immutableSegment);
- _logger.info("Added new immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
- } else {
- // When replacing a segment, we should register the new segment
'after' it is fully initialized by
- // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
- // to the valid docs in the old segment immediately, but the
validDocId bitmap of the new segment is still
- // being filled by partitionUpsertMetadataManager, making the queries
see less valid docs than expected.
- // When replacing a segment, the new and old segments are assumed to
have same set of valid docs for data
- // consistency, otherwise the new segment should be named differently
to go through the addSegment flow above.
- IndexSegment oldSegment = oldSegmentManager.getSegment();
- partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
- registerSegment(segmentName, newSegmentManager);
- _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
- oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, _tableNameWithType);
- releaseSegment(oldSegmentManager);
- }
- } finally {
- segmentLock.unlock();
- }
- }
-
- @Override
- protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
- // Cannot download consuming segment
- if (zkMetadata.getStatus() == Status.IN_PROGRESS) {
- return false;
- }
- // TODO: may support download from peer servers as well.
- return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
- }
-
- void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata
segmentZKMetadata,
- IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
- String uri = segmentZKMetadata.getDownloadUrl();
- if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
- try {
- // TODO: cleanup and consolidate the segment loading logic a bit for
OFFLINE and REALTIME tables.
- // https://github.com/apache/pinot/issues/9752
- downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, uri);
- } catch (Exception e) {
- _logger.warn("Download segment {} from deepstore uri {} failed.",
segmentName, uri, e);
- // Download from deep store failed; try to download from peer if peer
download is setup for the table.
- if (_peerDownloadScheme != null) {
- downloadSegmentFromPeer(segmentName, indexLoadingConfig);
- } else {
- throw e;
- }
- }
+ SegmentDataManager oldSegmentManager =
_segmentDataManagerMap.get(segmentName);
+ if (oldSegmentManager == null) {
+ // When adding a new segment, we should register it 'before' it is fully
initialized by
+ // partitionUpsertMetadataManager. Because when processing docs in the
new segment, the docs in the other
+ // segments may be invalidated, making the queries see less valid docs
than expected. We should let query
+ // access the new segment asap even though its validDocId bitmap is
still being filled by
+ // partitionUpsertMetadataManager.
+ registerSegment(segmentName, newSegmentManager);
+ partitionUpsertMetadataManager.addSegment(immutableSegment);
+ _logger.info("Added new immutable segment: {} to upsert-enabled table:
{}", segmentName, _tableNameWithType);
} else {
- if (_peerDownloadScheme != null) {
- downloadSegmentFromPeer(segmentName, indexLoadingConfig);
- } else {
- throw new RuntimeException("Peer segment download not enabled for
segment " + segmentName);
- }
- }
- }
-
- private void downloadSegmentFromDeepStore(String segmentName,
IndexLoadingConfig indexLoadingConfig, String uri) {
- // This could leave temporary directories in _indexDir if JVM shuts down
before the temp directory is deleted.
- // This is fine since the temporary directories are deleted when the table
data manager calls init.
- File tempRootDir = null;
- try {
- tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." +
System.currentTimeMillis());
- File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- SegmentFetcherFactory.fetchSegmentToLocal(uri, segmentTarFile);
- _logger.info("Downloaded file from {} to {}; Length of downloaded file:
{}", uri, segmentTarFile,
- segmentTarFile.length());
- untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile,
tempRootDir);
- } catch (Exception e) {
- _logger.warn("Failed to download segment {} from deep store: ",
segmentName, e);
- throw new RuntimeException(e);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
+ // When replacing a segment, we should register the new segment 'after'
it is fully initialized by
+ // partitionUpsertMetadataManager to fill up its validDocId bitmap.
Otherwise, the queries will lose the access
+ // to the valid docs in the old segment immediately, but the validDocId
bitmap of the new segment is still
+ // being filled by partitionUpsertMetadataManager, making the queries
see less valid docs than expected.
+ // When replacing a segment, the new and old segments are assumed to
have same set of valid docs for data
+ // consistency, otherwise the new segment should be named differently to
go through the addSegment flow above.
+ IndexSegment oldSegment = oldSegmentManager.getSegment();
+ partitionUpsertMetadataManager.replaceSegment(immutableSegment,
oldSegment);
+ registerSegment(segmentName, newSegmentManager);
+ _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
segmentName, _tableNameWithType);
+ releaseSegment(oldSegmentManager);
}
}
/**
- * Untars the new segment and replaces the existing segment.
+ * Replaces the CONSUMING segment with a downloaded sealed one.
*/
- private void untarAndMoveSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, File segmentTarFile,
- File tempRootDir)
- throws IOException {
- File untarDir = new File(tempRootDir, segmentName);
- File untaredSegDir = TarGzCompressionUtils.untar(segmentTarFile,
untarDir).get(0);
- _logger.info("Uncompressed file {} into tmp dir {}", segmentTarFile,
untarDir);
- File indexDir = new File(_indexDir, segmentName);
- FileUtils.deleteQuietly(indexDir);
- FileUtils.moveDirectory(untaredSegDir, indexDir);
- _logger.info("Replacing LLC Segment {}", segmentName);
- replaceLLSegment(segmentName, indexLoadingConfig);
- }
-
- private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig
indexLoadingConfig) {
- File tempRootDir = null;
- try {
- tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." +
System.currentTimeMillis());
- File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- // Next download the segment from a randomly chosen server using
configured download scheme (http or https).
-
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
() -> {
- List<URI> peerServerURIs =
- PeerServerSegmentFinder.getPeerServerURIs(_helixManager,
_tableNameWithType, segmentName,
- _peerDownloadScheme);
- Collections.shuffle(peerServerURIs);
- return peerServerURIs;
- }, segmentTarFile);
- _logger.info("Fetched segment {} successfully to {} of size {}",
segmentName, segmentTarFile,
- segmentTarFile.length());
- untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile,
tempRootDir);
- } catch (Exception e) {
- _logger.warn("Download and move segment {} from peer with scheme {}
failed.", segmentName, _peerDownloadScheme,
- e);
- throw new RuntimeException(e);
- } finally {
- FileUtils.deleteQuietly(tempRootDir);
- }
+ public void downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
+ throws Exception {
+ String segmentName = zkMetadata.getSegmentName();
+ _logger.info("Downloading and replacing CONSUMING segment: {} with sealed
one", segmentName);
Review Comment:
s/sealed/committed one to be a bit more precise?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]