mcvsubbu commented on a change in pull request #7969:
URL: https://github.com/apache/pinot/pull/7969#discussion_r780490610
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -277,49 +282,36 @@ public void addSegmentError(String segmentName,
SegmentErrorInfo segmentErrorInf
public void reloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata, @Nullable Schema schema, boolean
forceDownload)
throws Exception {
- File indexDir = localMetadata.getIndexDir();
- Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is
not a directory", indexDir);
-
- File parentFile = indexDir.getParentFile();
- File segmentBackupDir =
- new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-
+ File indexDir = getSegmentDataDir(segmentName);
try {
- // First rename index directory to segment backup directory so that
original segment have all file descriptors
- // point to the segment backup directory to ensure original segment
serves queries properly
+ // Create backup directory to handle failure of segment reloading.
+ createBackup(indexDir);
- // Rename index directory to segment backup directory (atomic)
- Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
- "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
-
- // Download from remote or copy from local backup directory into index
directory,
- // and then continue to load the segment from index directory.
+ // Download segment from deep store if CRC changes or forced to download;
+ // otherwise, copy backup directory back to the original index directory.
+ // And then continue to load the segment from the index directory.
boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata,
localMetadata);
if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
if (forceDownload) {
LOGGER.info("Segment: {} of table: {} is forced to download",
segmentName, _tableNameWithType);
} else {
- LOGGER.info("Download segment:{} of table: {} as local crc: {}
mismatches remote crc: {}", segmentName,
+ LOGGER.info("Download segment:{} of table: {} as crc changes from:
{} to: {}", segmentName,
_tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
}
indexDir = downloadSegment(segmentName, zkMetadata);
} else {
- LOGGER.info("Reload the local copy of segment: {} of table: {}",
segmentName, _tableNameWithType);
- FileUtils.copyDirectory(segmentBackupDir, indexDir);
+ LOGGER.info("Reload existing segment: {} of table: {}", segmentName,
_tableNameWithType);
+ try (SegmentDirectory segmentDirectory =
getSegmentDirectory(segmentName, indexLoadingConfig)) {
Review comment:
Why can't we use the same method as that in line 285
(`getSegmentDataDir(segmentName)`) ?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -277,49 +282,36 @@ public void addSegmentError(String segmentName,
SegmentErrorInfo segmentErrorInf
public void reloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata, @Nullable Schema schema, boolean
forceDownload)
throws Exception {
- File indexDir = localMetadata.getIndexDir();
- Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is
not a directory", indexDir);
-
- File parentFile = indexDir.getParentFile();
- File segmentBackupDir =
- new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-
+ File indexDir = getSegmentDataDir(segmentName);
try {
- // First rename index directory to segment backup directory so that
original segment have all file descriptors
- // point to the segment backup directory to ensure original segment
serves queries properly
+ // Create backup directory to handle failure of segment reloading.
+ createBackup(indexDir);
- // Rename index directory to segment backup directory (atomic)
- Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
- "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
-
- // Download from remote or copy from local backup directory into index
directory,
- // and then continue to load the segment from index directory.
+ // Download segment from deep store if CRC changes or forced to download;
+ // otherwise, copy backup directory back to the original index directory.
+ // And then continue to load the segment from the index directory.
boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata,
localMetadata);
if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
if (forceDownload) {
LOGGER.info("Segment: {} of table: {} is forced to download",
segmentName, _tableNameWithType);
} else {
- LOGGER.info("Download segment:{} of table: {} as local crc: {}
mismatches remote crc: {}", segmentName,
+ LOGGER.info("Download segment:{} of table: {} as crc changes from:
{} to: {}", segmentName,
_tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
}
indexDir = downloadSegment(segmentName, zkMetadata);
} else {
- LOGGER.info("Reload the local copy of segment: {} of table: {}",
segmentName, _tableNameWithType);
- FileUtils.copyDirectory(segmentBackupDir, indexDir);
+ LOGGER.info("Reload existing segment: {} of table: {}", segmentName,
_tableNameWithType);
+ try (SegmentDirectory segmentDirectory =
getSegmentDirectory(segmentName, indexLoadingConfig)) {
Review comment:
Why can't we use the same method as that in line 285
(`getSegmentDataDir(segmentName)`) ?
And also the FileUtils to copydir?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -335,26 +327,17 @@ public void reloadSegment(String segmentName,
IndexLoadingConfig indexLoadingCon
public void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
throws Exception {
- if (!isNewSegment(zkMetadata, localMetadata)) {
+ if (localMetadata != null && hasSameCRC(zkMetadata, localMetadata)) {
LOGGER.info("Segment: {} of table: {} has crc: {} same as before,
already loaded, do nothing", segmentName,
_tableNameWithType, localMetadata.getCrc());
return;
}
- // Try to recover if no local metadata is provided.
- if (localMetadata == null) {
- LOGGER.info("Segment: {} of table: {} is not loaded, checking disk",
segmentName, _tableNameWithType);
- localMetadata = recoverSegmentQuietly(segmentName);
- if (!isNewSegment(zkMetadata, localMetadata)) {
- LOGGER.info("Segment: {} of table {} has crc: {} same as before,
loading", segmentName, _tableNameWithType,
- localMetadata.getCrc());
- if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
- return;
- }
- // Set local metadata to null to indicate that the local segment fails
to load,
- // although it exists and has same crc with the remote one.
- localMetadata = null;
- }
+ // The segment is not loaded by the server yet, but it may still kept by
the server.
Review comment:
Can you add some comments on how you see this happening (local segment
present but not loaded).
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -480,12 +426,138 @@ File getSegmentDataDir(String segmentName) {
return new File(_indexDir, segmentName);
}
- @VisibleForTesting
- static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable
SegmentMetadata localMetadata) {
- return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+ /**
+ * Create a backup directory to handle failure of segment reloading.
+ * First rename index directory to segment backup directory so that original
segment have all file
+ * descriptors point to the segment backup directory to ensure original
segment serves queries properly.
+ * The original index directory is restored lazily, as depending on the
conditions,
+ * it may be restored from the backup directory or segment downloaded from
deep store.
+ */
+ private void createBackup(File indexDir) {
+ if (!indexDir.exists()) {
+ return;
+ }
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ // Rename index directory to segment backup directory (atomic).
+ Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+ "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
+ }
+
+ /**
+ * Remove the backup directory to mark the completion of segment reloading.
+ * First rename then delete is as renaming is an atomic operation, but
deleting is not.
+ * When we rename the segment backup directory to segment temporary
directory, we know the reload
+ * already succeeded, so that we can safely delete the segment temporary
directory.
+ */
+ private void removeBackup(File indexDir)
+ throws IOException {
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ if (!segmentBackupDir.exists()) {
+ return;
+ }
+ File segmentTempDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+ // Rename segment backup directory to segment temporary directory (atomic).
+ Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+ "Failed to rename segment backup directory: %s to segment temporary
directory: %s", segmentBackupDir,
+ segmentTempDir);
+ FileUtils.deleteDirectory(segmentTempDir);
+ }
+
+ private boolean tryLoadExistingSegment(String segmentName,
IndexLoadingConfig indexLoadingConfig,
+ SegmentZKMetadata zkMetadata) {
+ // Try to recover the segment from potential segment reloading failure.
+ File indexDir = getSegmentDataDir(segmentName);
+ recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+ // Creates the SegmentDirectory object to access the segment metadata.
+ // The metadata is null if the segment doesn't exist yet.
+ SegmentDirectory segmentDirectory = tryGetSegmentDirectory(segmentName,
indexLoadingConfig);
+ SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null :
segmentDirectory.getSegmentMetadata();
+
+ // If the segment doesn't exist on server or its CRC has changed, then we
+ // need to fall back to download the segment from deep store to load it.
+ if (segmentMetadata == null) {
+ LOGGER.info("Segment: {} of table: {} does not exist", segmentName,
_tableNameWithType);
+ return false;
+ }
+ if (!hasSameCRC(zkMetadata, segmentMetadata)) {
+ LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}",
segmentName, _tableNameWithType,
+ segmentMetadata.getCrc(), zkMetadata.getCrc());
+ return false;
+ }
+
+ try {
+ // If the segment is still kept by the server, then we can
+ // either load it directly if it's still consistent with latest table
config and schema;
+ // or reprocess it to reflect latest table config and schema before
loading.
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
+ if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory,
indexLoadingConfig, schema)) {
+ LOGGER.info("Segment: {} of table: {} is consistent with latest table
config and schema", segmentName,
+ _tableNameWithType);
+ } else {
+ LOGGER.info("Segment: {} of table: {} needs reprocess to reflect
latest table config and schema", segmentName,
+ _tableNameWithType);
+ segmentDirectory.copyTo(indexDir);
+ // Close the stale SegmentDirectory object and recreate it with
reprocessed segment.
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig,
schema);
+ segmentDirectory = getSegmentDirectory(segmentName,
indexLoadingConfig);
+ }
+ ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory,
indexLoadingConfig, schema);
+ addSegment(segment);
+ LOGGER.info("Loaded existing segment: {} of table: {} with crc: {}",
segmentName, _tableNameWithType,
+ zkMetadata.getCrc());
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Failed to load existing segment: {} of table: {} with crc:
{}", segmentName, _tableNameWithType, e);
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ return false;
+ }
+ }
+
+ private SegmentDirectory tryGetSegmentDirectory(String segmentName,
IndexLoadingConfig indexLoadingConfig) {
+ try {
+ return getSegmentDirectory(segmentName, indexLoadingConfig);
+ } catch (Exception e) {
+ LOGGER.warn("Attempt to get SegmentDirectory for segment: {} of table:
{} failed with error: {}", segmentName,
+ _tableNameWithType, e.getMessage());
+ return null;
+ }
+ }
+
+ private SegmentDirectory getSegmentDirectory(String segmentName,
IndexLoadingConfig indexLoadingConfig)
Review comment:
this method seems redundant. We already have a getSegmentDataDir method
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -480,12 +426,138 @@ File getSegmentDataDir(String segmentName) {
return new File(_indexDir, segmentName);
}
- @VisibleForTesting
- static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable
SegmentMetadata localMetadata) {
- return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+ /**
+ * Create a backup directory to handle failure of segment reloading.
+ * First rename index directory to segment backup directory so that original
segment have all file
+ * descriptors point to the segment backup directory to ensure original
segment serves queries properly.
+ * The original index directory is restored lazily, as depending on the
conditions,
+ * it may be restored from the backup directory or segment downloaded from
deep store.
+ */
+ private void createBackup(File indexDir) {
+ if (!indexDir.exists()) {
+ return;
+ }
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ // Rename index directory to segment backup directory (atomic).
+ Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+ "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
+ }
+
+ /**
+ * Remove the backup directory to mark the completion of segment reloading.
+ * First rename then delete is as renaming is an atomic operation, but
deleting is not.
+ * When we rename the segment backup directory to segment temporary
directory, we know the reload
+ * already succeeded, so that we can safely delete the segment temporary
directory.
+ */
+ private void removeBackup(File indexDir)
+ throws IOException {
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ if (!segmentBackupDir.exists()) {
+ return;
+ }
+ File segmentTempDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
Review comment:
Should we add a timestamp here to the tmp segment dir name to be safe?
If you agree you can add a TODO or do it in this PR.
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -480,12 +426,138 @@ File getSegmentDataDir(String segmentName) {
return new File(_indexDir, segmentName);
}
- @VisibleForTesting
- static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable
SegmentMetadata localMetadata) {
- return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+ /**
+ * Create a backup directory to handle failure of segment reloading.
+ * First rename index directory to segment backup directory so that original
segment have all file
+ * descriptors point to the segment backup directory to ensure original
segment serves queries properly.
+ * The original index directory is restored lazily, as depending on the
conditions,
+ * it may be restored from the backup directory or segment downloaded from
deep store.
+ */
+ private void createBackup(File indexDir) {
+ if (!indexDir.exists()) {
+ return;
+ }
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ // Rename index directory to segment backup directory (atomic).
+ Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+ "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
+ }
+
+ /**
+ * Remove the backup directory to mark the completion of segment reloading.
+ * First rename then delete is as renaming is an atomic operation, but
deleting is not.
+ * When we rename the segment backup directory to segment temporary
directory, we know the reload
+ * already succeeded, so that we can safely delete the segment temporary
directory.
+ */
+ private void removeBackup(File indexDir)
+ throws IOException {
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ if (!segmentBackupDir.exists()) {
+ return;
+ }
+ File segmentTempDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+ // Rename segment backup directory to segment temporary directory (atomic).
+ Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+ "Failed to rename segment backup directory: %s to segment temporary
directory: %s", segmentBackupDir,
+ segmentTempDir);
+ FileUtils.deleteDirectory(segmentTempDir);
+ }
+
+ private boolean tryLoadExistingSegment(String segmentName,
IndexLoadingConfig indexLoadingConfig,
+ SegmentZKMetadata zkMetadata) {
+ // Try to recover the segment from potential segment reloading failure.
+ File indexDir = getSegmentDataDir(segmentName);
+ recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+ // Creates the SegmentDirectory object to access the segment metadata.
+ // The metadata is null if the segment doesn't exist yet.
+ SegmentDirectory segmentDirectory = tryGetSegmentDirectory(segmentName,
indexLoadingConfig);
+ SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null :
segmentDirectory.getSegmentMetadata();
+
+ // If the segment doesn't exist on server or its CRC has changed, then we
+ // need to fall back to download the segment from deep store to load it.
+ if (segmentMetadata == null) {
+ LOGGER.info("Segment: {} of table: {} does not exist", segmentName,
_tableNameWithType);
+ return false;
+ }
+ if (!hasSameCRC(zkMetadata, segmentMetadata)) {
+ LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}",
segmentName, _tableNameWithType,
+ segmentMetadata.getCrc(), zkMetadata.getCrc());
+ return false;
+ }
+
+ try {
+ // If the segment is still kept by the server, then we can
+ // either load it directly if it's still consistent with latest table
config and schema;
+ // or reprocess it to reflect latest table config and schema before
loading.
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
+ if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory,
indexLoadingConfig, schema)) {
+ LOGGER.info("Segment: {} of table: {} is consistent with latest table
config and schema", segmentName,
+ _tableNameWithType);
+ } else {
+ LOGGER.info("Segment: {} of table: {} needs reprocess to reflect
latest table config and schema", segmentName,
+ _tableNameWithType);
+ segmentDirectory.copyTo(indexDir);
+ // Close the stale SegmentDirectory object and recreate it with
reprocessed segment.
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig,
schema);
+ segmentDirectory = getSegmentDirectory(segmentName,
indexLoadingConfig);
+ }
+ ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory,
indexLoadingConfig, schema);
+ addSegment(segment);
+ LOGGER.info("Loaded existing segment: {} of table: {} with crc: {}",
segmentName, _tableNameWithType,
+ zkMetadata.getCrc());
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Failed to load existing segment: {} of table: {} with crc:
{}", segmentName, _tableNameWithType, e);
+ closeSegmentDirectoryQuietly(segmentDirectory);
+ return false;
+ }
+ }
+
+ private SegmentDirectory tryGetSegmentDirectory(String segmentName,
IndexLoadingConfig indexLoadingConfig) {
+ try {
+ return getSegmentDirectory(segmentName, indexLoadingConfig);
Review comment:
can we use one call to get segment directory?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -480,12 +426,138 @@ File getSegmentDataDir(String segmentName) {
return new File(_indexDir, segmentName);
}
- @VisibleForTesting
- static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable
SegmentMetadata localMetadata) {
- return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+ /**
+ * Create a backup directory to handle failure of segment reloading.
+ * First rename index directory to segment backup directory so that original
segment have all file
+ * descriptors point to the segment backup directory to ensure original
segment serves queries properly.
+ * The original index directory is restored lazily, as depending on the
conditions,
+ * it may be restored from the backup directory or segment downloaded from
deep store.
+ */
+ private void createBackup(File indexDir) {
+ if (!indexDir.exists()) {
+ return;
+ }
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ // Rename index directory to segment backup directory (atomic).
+ Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+ "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
+ }
+
+ /**
+ * Remove the backup directory to mark the completion of segment reloading.
+ * First rename then delete is as renaming is an atomic operation, but
deleting is not.
+ * When we rename the segment backup directory to segment temporary
directory, we know the reload
+ * already succeeded, so that we can safely delete the segment temporary
directory.
+ */
+ private void removeBackup(File indexDir)
+ throws IOException {
+ File parentDir = indexDir.getParentFile();
+ File segmentBackupDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ if (!segmentBackupDir.exists()) {
+ return;
+ }
+ File segmentTempDir = new File(parentDir, indexDir.getName() +
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
Review comment:
OK, agreed.
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -335,26 +327,19 @@ public void reloadSegment(String segmentName,
IndexLoadingConfig indexLoadingCon
public void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
throws Exception {
- if (!isNewSegment(zkMetadata, localMetadata)) {
+ if (localMetadata != null && hasSameCRC(zkMetadata, localMetadata)) {
LOGGER.info("Segment: {} of table: {} has crc: {} same as before,
already loaded, do nothing", segmentName,
_tableNameWithType, localMetadata.getCrc());
return;
}
- // Try to recover if no local metadata is provided.
- if (localMetadata == null) {
- LOGGER.info("Segment: {} of table: {} is not loaded, checking disk",
segmentName, _tableNameWithType);
- localMetadata = recoverSegmentQuietly(segmentName);
- if (!isNewSegment(zkMetadata, localMetadata)) {
- LOGGER.info("Segment: {} of table {} has crc: {} same as before,
loading", segmentName, _tableNameWithType,
- localMetadata.getCrc());
- if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
- return;
- }
- // Set local metadata to null to indicate that the local segment fails
to load,
- // although it exists and has same crc with the remote one.
- localMetadata = null;
- }
+ // The segment is not loaded by the server if the metadata object is null.
But the segment
+ // may still be kept on the server. For example when server gets
restarted, the segment is
+ // still on the server but the metadata object has not been initialized
yet. In this case,
+ // we should check if the segment exists on server and try to load it. If
the segment does
+ // not exist or fails to get loaded, we download segment from deep store
to load it again.
+ if (localMetadata == null && tryLoadExistingSegment(segmentName,
indexLoadingConfig, zkMetadata)) {
Review comment:
tryLoadExistingSegment calls tryInitSegmentDirectory, which ends up
loading the segment if it is locally present.
Yet, in line 510, we call load again. That seems to be a bug.
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -277,49 +282,36 @@ public void addSegmentError(String segmentName,
SegmentErrorInfo segmentErrorInf
public void reloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata, @Nullable Schema schema, boolean
forceDownload)
throws Exception {
- File indexDir = localMetadata.getIndexDir();
- Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is
not a directory", indexDir);
-
- File parentFile = indexDir.getParentFile();
- File segmentBackupDir =
- new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-
+ File indexDir = getSegmentDataDir(segmentName);
try {
- // First rename index directory to segment backup directory so that
original segment have all file descriptors
- // point to the segment backup directory to ensure original segment
serves queries properly
+ // Create backup directory to handle failure of segment reloading.
+ createBackup(indexDir);
- // Rename index directory to segment backup directory (atomic)
- Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
- "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
-
- // Download from remote or copy from local backup directory into index
directory,
- // and then continue to load the segment from index directory.
+ // Download segment from deep store if CRC changes or forced to download;
+ // otherwise, copy backup directory back to the original index directory.
+ // And then continue to load the segment from the index directory.
boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata,
localMetadata);
if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
if (forceDownload) {
LOGGER.info("Segment: {} of table: {} is forced to download",
segmentName, _tableNameWithType);
} else {
- LOGGER.info("Download segment:{} of table: {} as local crc: {}
mismatches remote crc: {}", segmentName,
+ LOGGER.info("Download segment:{} of table: {} as crc changes from:
{} to: {}", segmentName,
_tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
}
indexDir = downloadSegment(segmentName, zkMetadata);
} else {
- LOGGER.info("Reload the local copy of segment: {} of table: {}",
segmentName, _tableNameWithType);
- FileUtils.copyDirectory(segmentBackupDir, indexDir);
+ LOGGER.info("Reload existing segment: {} of table: {}", segmentName,
_tableNameWithType);
+ try (SegmentDirectory segmentDirectory =
initSegmentDirectory(segmentName, indexLoadingConfig)) {
Review comment:
This seems to be buggy. As I understand it, we seem to be copying from
an empty dir to an empty dir, whereas we need to copy from the backupdir to the
segment dir. Correct me if I am wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]