klsince commented on a change in pull request #7969:
URL: https://github.com/apache/pinot/pull/7969#discussion_r780565301



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -480,12 +426,138 @@ File getSegmentDataDir(String segmentName) {
     return new File(_indexDir, segmentName);
   }
 
-  @VisibleForTesting
-  static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable 
SegmentMetadata localMetadata) {
-    return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+  /**
+   * Create a backup directory to handle failure of segment reloading.
+   * First rename index directory to segment backup directory so that original 
segment have all file
+   * descriptors point to the segment backup directory to ensure original 
segment serves queries properly.
+   * The original index directory is restored lazily, as depending on the 
conditions,
+   * it may be restored from the backup directory or segment downloaded from 
deep store.
+   */
+  private void createBackup(File indexDir) {
+    if (!indexDir.exists()) {
+      return;
+    }
+    File parentDir = indexDir.getParentFile();
+    File segmentBackupDir = new File(parentDir, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    // Rename index directory to segment backup directory (atomic).
+    Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+        "Failed to rename index directory: %s to segment backup directory: 
%s", indexDir, segmentBackupDir);
+  }
+
+  /**
+   * Remove the backup directory to mark the completion of segment reloading.
+   * First rename then delete is as renaming is an atomic operation, but 
deleting is not.
+   * When we rename the segment backup directory to segment temporary 
directory, we know the reload
+   * already succeeded, so that we can safely delete the segment temporary 
directory.
+   */
+  private void removeBackup(File indexDir)
+      throws IOException {
+    File parentDir = indexDir.getParentFile();
+    File segmentBackupDir = new File(parentDir, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    if (!segmentBackupDir.exists()) {
+      return;
+    }
+    File segmentTempDir = new File(parentDir, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+    // Rename segment backup directory to segment temporary directory (atomic).
+    Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+        "Failed to rename segment backup directory: %s to segment temporary 
directory: %s", segmentBackupDir,
+        segmentTempDir);
+    FileUtils.deleteDirectory(segmentTempDir);
+  }
+
+  private boolean tryLoadExistingSegment(String segmentName, 
IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata) {
+    // Try to recover the segment from potential segment reloading failure.
+    File indexDir = getSegmentDataDir(segmentName);
+    recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+    // Creates the SegmentDirectory object to access the segment metadata.
+    // The metadata is null if the segment doesn't exist yet.
+    SegmentDirectory segmentDirectory = tryGetSegmentDirectory(segmentName, 
indexLoadingConfig);
+    SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null : 
segmentDirectory.getSegmentMetadata();
+
+    // If the segment doesn't exist on server or its CRC has changed, then we
+    // need to fall back to download the segment from deep store to load it.
+    if (segmentMetadata == null) {
+      LOGGER.info("Segment: {} of table: {} does not exist", segmentName, 
_tableNameWithType);
+      return false;
+    }
+    if (!hasSameCRC(zkMetadata, segmentMetadata)) {
+      LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}", 
segmentName, _tableNameWithType,
+          segmentMetadata.getCrc(), zkMetadata.getCrc());
+      return false;
+    }
+
+    try {
+      // If the segment is still kept by the server, then we can
+      // either load it directly if it's still consistent with latest table 
config and schema;
+      // or reprocess it to reflect latest table config and schema before 
loading.
+      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
+      if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory, 
indexLoadingConfig, schema)) {
+        LOGGER.info("Segment: {} of table: {} is consistent with latest table 
config and schema", segmentName,
+            _tableNameWithType);
+      } else {
+        LOGGER.info("Segment: {} of table: {} needs reprocess to reflect 
latest table config and schema", segmentName,
+            _tableNameWithType);
+        segmentDirectory.copyTo(indexDir);
+        // Close the stale SegmentDirectory object and recreate it with 
reprocessed segment.
+        closeSegmentDirectoryQuietly(segmentDirectory);
+        ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig, 
schema);
+        segmentDirectory = getSegmentDirectory(segmentName, 
indexLoadingConfig);
+      }
+      ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, 
indexLoadingConfig, schema);
+      addSegment(segment);
+      LOGGER.info("Loaded existing segment: {} of table: {} with crc: {}", 
segmentName, _tableNameWithType,
+          zkMetadata.getCrc());
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to load existing segment: {} of table: {} with crc: 
{}", segmentName, _tableNameWithType, e);
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      return false;
+    }
+  }
+
+  private SegmentDirectory tryGetSegmentDirectory(String segmentName, 
IndexLoadingConfig indexLoadingConfig) {
+    try {
+      return getSegmentDirectory(segmentName, indexLoadingConfig);

Review comment:
       renamed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to