klsince commented on a change in pull request #7969:
URL: https://github.com/apache/pinot/pull/7969#discussion_r779041798
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -333,94 +306,162 @@ public void reloadSegment(String segmentName,
IndexLoadingConfig indexLoadingCon
@Override
public void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
- SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+ SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata segmentMetadata)
throws Exception {
- if (!isNewSegment(zkMetadata, localMetadata)) {
- LOGGER.info("Segment: {} of table: {} has crc: {} same as before,
already loaded, do nothing", segmentName,
- _tableNameWithType, localMetadata.getCrc());
+ // Non-null segment metadata means the segment has already been loaded.
+ if (segmentMetadata != null) {
+ if (hasSameCRC(zkMetadata, segmentMetadata)) {
+ // Simply returns if the CRC hasn't changed. The table config may have
changed
+ // since segment is loaded, but that is handled by reloadSegment()
method.
+ LOGGER.info("Segment: {} of table: {} has crc: {} same as before,
already loaded, do nothing", segmentName,
+ _tableNameWithType, segmentMetadata.getCrc());
+ } else {
+ // Download the raw segment, reprocess and load it if the CRC has
changed.
+ LOGGER.info("Segment: {} of table: {} already loaded but its crc: {}
differs from new crc: {}", segmentName,
+ _tableNameWithType, segmentMetadata.getCrc(), zkMetadata.getCrc());
+ downloadRawSegmentAndProcess(segmentName, indexLoadingConfig,
zkMetadata,
+ ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType));
+ }
return;
}
- // Try to recover if no local metadata is provided.
- if (localMetadata == null) {
- LOGGER.info("Segment: {} of table: {} is not loaded, checking disk",
segmentName, _tableNameWithType);
- localMetadata = recoverSegmentQuietly(segmentName);
- if (!isNewSegment(zkMetadata, localMetadata)) {
- LOGGER.info("Segment: {} of table {} has crc: {} same as before,
loading", segmentName, _tableNameWithType,
- localMetadata.getCrc());
- if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
- return;
- }
- // Set local metadata to null to indicate that the local segment fails
to load,
- // although it exists and has same crc with the remote one.
- localMetadata = null;
- }
- }
+ // For local tier backend, try to recover the segment from potential
+ // reload failure. Continue upon any failure.
+ File indexDir = getSegmentDataDir(segmentName);
+ recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
- Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment:
%s of table: %s does not allow download",
- segmentName, _tableNameWithType);
+ // Creates the SegmentDirectory object to access the segment metadata that
+ // may be from local tier backend or remote tier backend.
+ SegmentDirectory segmentDirectory = null;
+ try {
+ SegmentDirectoryLoaderContext loaderContext =
+ new
SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(),
indexLoadingConfig.getInstanceId(),
+ segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+ SegmentDirectoryLoader segmentDirectoryLoader =
+
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+ segmentDirectory = segmentDirectoryLoader.load(indexDir.toURI(),
loaderContext);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to create SegmentDirectory for segment: {} of table:
{} due to error: {}", segmentName,
+ _tableNameWithType, e.getMessage());
+ }
- // Download segment and replace the local one, either due to failure to
recover local segment,
- // or the segment data is updated and has new CRC now.
- if (localMetadata == null) {
- LOGGER.info("Download segment: {} of table: {} as no good one exists
locally", segmentName, _tableNameWithType);
- } else {
- LOGGER.info("Download segment: {} of table: {} as local crc: {}
mismatches remote crc: {}.", segmentName,
- _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+ // Download the raw segment, reprocess and load it if it is never loaded
or its CRC has changed.
+ if (segmentDirectory == null) {
+ LOGGER.info("Segment: {} of table: {} does not exist", segmentName,
_tableNameWithType);
+ downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+ ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType));
+ return;
+ }
+ if (!hasSameCRC(zkMetadata, segmentDirectory.getSegmentMetadata())) {
+ LOGGER.info("Segment: {} of table: {} has crc: {} different from new
crc: {}", segmentName,
+ _tableNameWithType, segmentDirectory.getSegmentMetadata().getCrc(),
zkMetadata.getCrc());
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+ ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType));
+ return;
}
- File indexDir = downloadSegment(segmentName, zkMetadata);
- addSegment(indexDir, indexLoadingConfig);
- LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}",
segmentName, _tableNameWithType,
- zkMetadata.getCrc());
- }
- protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
- return true;
+ try {
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
+ if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory,
indexLoadingConfig, schema)) {
+ // The loaded segment is still consistent with current table config or
schema.
+ LOGGER.info("Segment: {} of table: {} is consistent with table config
and schema", segmentName,
+ _tableNameWithType);
+ loadSegment(segmentDirectory, indexLoadingConfig, schema);
+ return;
+ }
+ // If any discrepancy is found, get the segment from tier backend,
reprocess and load it.
+ LOGGER.info("Segment: {} of table: {} needs reprocess to reflect latest
table config and schema", segmentName,
+ _tableNameWithType);
+ // Close the stale SegmentDirectory object before loading the newly
processed segment.
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ downloadTierSegmentAndProcess(segmentName, indexLoadingConfig, schema);
+ } catch (Exception e) {
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ LOGGER.error("Failed to reprocess and load segment: {} of table: {}",
segmentName, _tableNameWithType, e);
+ downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+ ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType));
+ }
}
- protected File downloadSegment(String segmentName, SegmentZKMetadata
zkMetadata)
+ /**
+ * Get the segment from the configured tier backend, reprocess it with the
latest table
+ * config and schema and then load it. Please note that the segment is from
tier backend,
+ * not deep store, for incremental processing.
+ */
+ private void downloadTierSegmentAndProcess(String segmentName,
IndexLoadingConfig indexLoadingConfig,
+ @Nullable Schema schema)
throws Exception {
- // TODO: may support download from peer servers for RealTime table.
- return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+ SegmentDirectoryLoaderContext loaderContext =
+ new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(),
indexLoadingConfig.getInstanceId(),
+ segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+ SegmentDirectoryLoader segmentDirectoryLoader =
+
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+ File indexDir = getSegmentDataDir(segmentName);
+ segmentDirectoryLoader.download(indexDir, loaderContext);
Review comment:
The download and upload methods are introduced to SegmentDirectoryLoader
interface is because today the SegmentPreprocessor and pretty much all kinds of
IndexCreators used by it requires the `File indexDir` as an input param. If
they were able to work with SegmentDirectory interface instead of `File
indexDir`, the download/upload methods wouldn't be needed in
SegmentDirectoryLoader interface.
Hope this adds more clarity on why downloading raw segment from deep store
is not part of SegmentDirectoryLoader interface. As there can be a path forward
to remove download/upload methods from SegmentDirectoryLoader, if segment
preprocessing logic could work with SegmentDirectory interface instead of
requiring File index explicitly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]