This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e31e6f9  refactor segment loading logic in BaseTableDataManager to 
decouple it with local segment directory  (#7969)
e31e6f9 is described below

commit e31e6f90cdf9b425c0584484aaac0db4a79b2c15
Author: Xiaobing <[email protected]>
AuthorDate: Thu Jan 13 18:13:34 2022 -0800

    refactor segment loading logic in BaseTableDataManager to decouple it with 
local segment directory  (#7969)
    
    * extend SegmentDirectoryLoader interface and refactor BaseTableDataManager 
so that it
    depends on SegmentDirectory interface instead of local seg dir to be more 
extensible
    
    * add license header
    
    * address comments
    
    * rebase and fix UT
    
    * cr - refine comments, namings and logs, and refactor a few method
    
    * refactor BaseTableDataMgr with SegmentDirectory.copyTo
    
    * refine naming and comments
    
    * close segdir before early return
    
    * rebase master
    
    * comments for clarity
    
    * refine comments
    
    * add more context in comments
    
    Co-authored-by: Xiaobing Li <[email protected]>
---
 .../core/data/manager/BaseTableDataManager.java    | 262 ++++++----
 .../data/manager/BaseTableDataManagerTest.java     | 534 ++++++++++++++-------
 .../local/data/manager/TableDataManager.java       |  16 +
 .../immutable/ImmutableSegmentLoader.java          |  85 +++-
 .../loader/DefaultSegmentDirectoryLoader.java      |   6 +-
 .../index/column/PhysicalColumnIndexContainer.java |   2 +-
 .../segment/index/loader/SegmentPreProcessor.java  |  34 +-
 .../ColumnMinMaxValueGenerator.java                |   2 +-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |   2 +-
 .../segment/store/SegmentLocalFSDirectory.java     |  24 +
 .../spi/index/metadata/SegmentMetadataImpl.java    |  18 +-
 .../pinot/segment/spi/store/SegmentDirectory.java  |   9 +
 .../starter/helix/HelixInstanceDataManager.java    |  47 +-
 .../helix/HelixInstanceDataManagerTest.java        |  70 +++
 14 files changed, 792 insertions(+), 319 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1a5c013..02aab34 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -36,6 +36,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -53,6 +54,10 @@ import 
org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -285,49 +290,40 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   public void reloadSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata,
       SegmentMetadata localMetadata, @Nullable Schema schema, boolean 
forceDownload)
       throws Exception {
-    File indexDir = localMetadata.getIndexDir();
-    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is 
not a directory", indexDir);
-
-    File parentFile = indexDir.getParentFile();
-    File segmentBackupDir =
-        new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-
+    File indexDir = getSegmentDataDir(segmentName);
     try {
-      // First rename index directory to segment backup directory so that 
original segment have all file descriptors
-      // point to the segment backup directory to ensure original segment 
serves queries properly
+      // Create backup directory to handle failure of segment reloading.
+      createBackup(indexDir);
 
-      // Rename index directory to segment backup directory (atomic)
-      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
-          "Failed to rename index directory: %s to segment backup directory: 
%s", indexDir, segmentBackupDir);
-
-      // Download from remote or copy from local backup directory into index 
directory,
-      // and then continue to load the segment from index directory.
+      // Download segment from deep store if CRC changes or forced to download;
+      // otherwise, copy backup directory back to the original index directory.
+      // And then continue to load the segment from the index directory.
       boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, 
localMetadata);
       if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
         if (forceDownload) {
           LOGGER.info("Segment: {} of table: {} is forced to download", 
segmentName, _tableNameWithType);
         } else {
-          LOGGER.info("Download segment:{} of table: {} as local crc: {} 
mismatches remote crc: {}", segmentName,
+          LOGGER.info("Download segment:{} of table: {} as crc changes from: 
{} to: {}", segmentName,
               _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
         }
         indexDir = downloadSegment(segmentName, zkMetadata);
       } else {
-        LOGGER.info("Reload the local copy of segment: {} of table: {}", 
segmentName, _tableNameWithType);
-        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+        LOGGER.info("Reload existing segment: {} of table: {}", segmentName, 
_tableNameWithType);
+        // The indexDir is empty after calling createBackup, as it's renamed 
to a backup directory.
+        // The SegmentDirectory should initialize accordingly. Like for 
SegmentLocalFSDirectory, it
+        // doesn't load anything from an empty indexDir, but gets the info to 
complete the copyTo.
+        try (SegmentDirectory segmentDirectory = 
initSegmentDirectory(segmentName, indexLoadingConfig)) {
+          segmentDirectory.copyTo(indexDir);
+        }
       }
 
-      // Load from index directory and replace the old segment in memory.
-      addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
schema));
-
-      // Rename segment backup directory to segment temporary directory 
(atomic)
-      // The reason to first rename then delete is that, renaming is an atomic 
operation, but deleting is not. When we
-      // rename the segment backup directory to segment temporary directory, 
we know the reload already succeeded, so
-      // that we can safely delete the segment temporary directory
-      File segmentTempDir = new File(parentFile, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
-      Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
-          "Failed to rename segment backup directory: %s to segment temporary 
directory: %s", segmentBackupDir,
-          segmentTempDir);
-      FileUtils.deleteDirectory(segmentTempDir);
+      // Load from indexDir and replace the old segment in memory. What's 
inside indexDir
+      // may come from SegmentDirectory.copyTo() or the segment downloaded 
from deep store.
+      ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig, schema);
+      addSegment(segment);
+
+      // Remove backup directory to mark the completion of segment reloading.
+      removeBackup(indexDir);
     } catch (Exception reloadFailureException) {
       try {
         LoaderUtils.reloadFailureRecovery(indexDir);
@@ -343,26 +339,19 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   public void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig,
       SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
       throws Exception {
-    if (!isNewSegment(zkMetadata, localMetadata)) {
+    if (localMetadata != null && hasSameCRC(zkMetadata, localMetadata)) {
       LOGGER.info("Segment: {} of table: {} has crc: {} same as before, 
already loaded, do nothing", segmentName,
           _tableNameWithType, localMetadata.getCrc());
       return;
     }
 
-    // Try to recover if no local metadata is provided.
-    if (localMetadata == null) {
-      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", 
segmentName, _tableNameWithType);
-      localMetadata = recoverSegmentQuietly(segmentName);
-      if (!isNewSegment(zkMetadata, localMetadata)) {
-        LOGGER.info("Segment: {} of table {} has crc: {} same as before, 
loading", segmentName, _tableNameWithType,
-            localMetadata.getCrc());
-        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
-          return;
-        }
-        // Set local metadata to null to indicate that the local segment fails 
to load,
-        // although it exists and has same crc with the remote one.
-        localMetadata = null;
-      }
+    // The segment is not loaded by the server if the metadata object is null. 
But the segment
+    // may still be kept on the server. For example when server gets 
restarted, the segment is
+    // still on the server but the metadata object has not been initialized 
yet. In this case,
+    // we should check if the segment exists on server and try to load it. If 
the segment does
+    // not exist or fails to get loaded, we download segment from deep store 
to load it again.
+    if (localMetadata == null && tryLoadExistingSegment(segmentName, 
indexLoadingConfig, zkMetadata)) {
+      return;
     }
 
     Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: 
%s of table: %s does not allow download",
@@ -371,9 +360,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     // Download segment and replace the local one, either due to failure to 
recover local segment,
     // or the segment data is updated and has new CRC now.
     if (localMetadata == null) {
-      LOGGER.info("Download segment: {} of table: {} as no good one exists 
locally", segmentName, _tableNameWithType);
+      LOGGER.info("Download segment: {} of table: {} as it doesn't exist", 
segmentName, _tableNameWithType);
     } else {
-      LOGGER.info("Download segment: {} of table: {} as local crc: {} 
mismatches remote crc: {}.", segmentName,
+      LOGGER.info("Download segment: {} of table: {} as crc changes from: {} 
to: {}", segmentName,
           _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
     }
     File indexDir = downloadSegment(segmentName, zkMetadata);
@@ -392,43 +381,6 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     return downloadSegmentFromDeepStore(segmentName, zkMetadata);
   }
 
-  /**
-   * Server restart during segment reload might leave segment directory in 
inconsistent state, like the index
-   * directory might not exist but segment backup directory existed. This 
method tries to recover from reload
-   * failure before checking the existence of the index directory and loading 
segment metadata from it.
-   */
-  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
-    File indexDir = getSegmentDataDir(segmentName);
-    try {
-      LoaderUtils.reloadFailureRecovery(indexDir);
-      if (!indexDir.exists()) {
-        LOGGER.info("Segment: {} of table: {} is not found on disk", 
segmentName, _tableNameWithType);
-        return null;
-      }
-      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
-      LOGGER.info("Segment: {} of table: {} with crc: {} from disk is ready 
for loading", segmentName,
-          _tableNameWithType, localMetadata.getCrc());
-      return localMetadata;
-    } catch (Exception e) {
-      LOGGER.error("Failed to recover segment: {} of table: {} from disk", 
segmentName, _tableNameWithType, e);
-      FileUtils.deleteQuietly(indexDir);
-      return null;
-    }
-  }
-
-  private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig 
indexLoadingConfig) {
-    File indexDir = getSegmentDataDir(segmentName);
-    try {
-      addSegment(indexDir, indexLoadingConfig);
-      LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName, 
_tableNameWithType);
-      return true;
-    } catch (Exception e) {
-      FileUtils.deleteQuietly(indexDir);
-      LOGGER.error("Failed to load segment: {} of table: {} from disk", 
segmentName, _tableNameWithType, e);
-      return false;
-    }
-  }
-
   private File downloadSegmentFromDeepStore(String segmentName, 
SegmentZKMetadata zkMetadata)
       throws Exception {
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
@@ -493,12 +445,148 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     return new File(_resourceTmpDir, segmentName);
   }
 
-  @VisibleForTesting
-  static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable 
SegmentMetadata localMetadata) {
-    return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+  /**
+   * Create a backup directory to handle failure of segment reloading.
+   * First rename index directory to segment backup directory so that original 
segment have all file
+   * descriptors point to the segment backup directory to ensure original 
segment serves queries properly.
+   * The original index directory is restored lazily, as depending on the 
conditions,
+   * it may be restored from the backup directory or segment downloaded from 
deep store.
+   */
+  private void createBackup(File indexDir) {
+    if (!indexDir.exists()) {
+      return;
+    }
+    File parentDir = indexDir.getParentFile();
+    File segmentBackupDir = new File(parentDir, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    // Rename index directory to segment backup directory (atomic).
+    Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+        "Failed to rename index directory: %s to segment backup directory: 
%s", indexDir, segmentBackupDir);
+  }
+
+  /**
+   * Remove the backup directory to mark the completion of segment reloading.
+   * First rename then delete is as renaming is an atomic operation, but 
deleting is not.
+   * When we rename the segment backup directory to segment temporary 
directory, we know the reload
+   * already succeeded, so that we can safely delete the segment temporary 
directory.
+   */
+  private void removeBackup(File indexDir)
+      throws IOException {
+    File parentDir = indexDir.getParentFile();
+    File segmentBackupDir = new File(parentDir, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    if (!segmentBackupDir.exists()) {
+      return;
+    }
+    // Rename segment backup directory to segment temporary directory (atomic).
+    File segmentTempDir = new File(parentDir, indexDir.getName() + 
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+    Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+        "Failed to rename segment backup directory: %s to segment temporary 
directory: %s", segmentBackupDir,
+        segmentTempDir);
+    FileUtils.deleteDirectory(segmentTempDir);
+  }
+
+  /**
+   * Try to load the segment potentially still existing on the server.
+   *
+   * @return true if the segment still exists on server, its CRC is still same 
with the
+   * one in SegmentZKMetadata and is loaded into memory successfully; false if 
it doesn't
+   * exist on the server, its CRC has changed, or it fails to be loaded. 
SegmentDirectory
+   * object may be created when trying to load the segment, but it's closed if 
the method
+   * returns false; otherwise it's opened and to be referred by 
ImmutableSegment object.
+   */
+  private boolean tryLoadExistingSegment(String segmentName, 
IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata) {
+    // Try to recover the segment from potential segment reloading failure.
+    File indexDir = getSegmentDataDir(segmentName);
+    recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+    // Creates the SegmentDirectory object to access the segment metadata.
+    // The metadata is null if the segment doesn't exist yet.
+    SegmentDirectory segmentDirectory = tryInitSegmentDirectory(segmentName, 
indexLoadingConfig);
+    SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null : 
segmentDirectory.getSegmentMetadata();
+
+    // If the segment doesn't exist on server or its CRC has changed, then we
+    // need to fall back to download the segment from deep store to load it.
+    if (segmentMetadata == null || !hasSameCRC(zkMetadata, segmentMetadata)) {
+      if (segmentMetadata == null) {
+        LOGGER.info("Segment: {} of table: {} does not exist", segmentName, 
_tableNameWithType);
+      } else if (!hasSameCRC(zkMetadata, segmentMetadata)) {
+        LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}", 
segmentName, _tableNameWithType,
+            segmentMetadata.getCrc(), zkMetadata.getCrc());
+      }
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      return false;
+    }
+
+    try {
+      // If the segment is still kept by the server, then we can
+      // either load it directly if it's still consistent with latest table 
config and schema;
+      // or reprocess it to reflect latest table config and schema before 
loading.
+      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
+      if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory, 
indexLoadingConfig, schema)) {
+        LOGGER.info("Segment: {} of table: {} is consistent with latest table 
config and schema", segmentName,
+            _tableNameWithType);
+      } else {
+        LOGGER.info("Segment: {} of table: {} needs reprocess to reflect 
latest table config and schema", segmentName,
+            _tableNameWithType);
+        segmentDirectory.copyTo(indexDir);
+        // Close the stale SegmentDirectory object and recreate it with 
reprocessed segment.
+        closeSegmentDirectoryQuietly(segmentDirectory);
+        ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig, 
schema);
+        segmentDirectory = initSegmentDirectory(segmentName, 
indexLoadingConfig);
+      }
+      ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, 
indexLoadingConfig, schema);
+      addSegment(segment);
+      LOGGER.info("Loaded existing segment: {} of table: {} with crc: {}", 
segmentName, _tableNameWithType,
+          zkMetadata.getCrc());
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to load existing segment: {} of table: {} with crc: 
{}", segmentName, _tableNameWithType, e);
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      return false;
+    }
+  }
+
+  private SegmentDirectory tryInitSegmentDirectory(String segmentName, 
IndexLoadingConfig indexLoadingConfig) {
+    try {
+      return initSegmentDirectory(segmentName, indexLoadingConfig);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to initialize SegmentDirectory for segment: {} of 
table: {} with error: {}", segmentName,
+          _tableNameWithType, e.getMessage());
+      return null;
+    }
+  }
+
+  private SegmentDirectory initSegmentDirectory(String segmentName, 
IndexLoadingConfig indexLoadingConfig)
+      throws Exception {
+    SegmentDirectoryLoaderContext loaderContext =
+        new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), 
indexLoadingConfig.getInstanceId(),
+            segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+    SegmentDirectoryLoader segmentDirectoryLoader =
+        
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+    File indexDir = getSegmentDataDir(segmentName);
+    return segmentDirectoryLoader.load(indexDir.toURI(), loaderContext);
   }
 
   private static boolean hasSameCRC(SegmentZKMetadata zkMetadata, 
SegmentMetadata localMetadata) {
     return zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc());
   }
+
+  private static void recoverReloadFailureQuietly(String tableNameWithType, 
String segmentName, File indexDir) {
+    try {
+      LoaderUtils.reloadFailureRecovery(indexDir);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to recover segment: {} of table: {} due to error: 
{}", segmentName, tableNameWithType,
+          e.getMessage());
+    }
+  }
+
+  private static void closeSegmentDirectoryQuietly(SegmentDirectory 
segmentDirectory) {
+    if (segmentDirectory != null) {
+      try {
+        segmentDirectory.close();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to close SegmentDirectory due to error: {}", 
e.getMessage());
+      }
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 78af028..fac8977 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -18,9 +18,15 @@
  */
 package org.apache.pinot.core.data.manager;
 
+import java.io.DataInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
@@ -32,14 +38,25 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterMethod;
@@ -58,9 +75,13 @@ import static org.testng.Assert.fail;
 
 
 public class BaseTableDataManagerTest {
-  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"OfflineTableDataManagerTest");
-
-  private static final String TABLE_NAME = "__table01__";
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"BaseTableDataManagerTest");
+  private static final String TABLE_NAME = "table01";
+  private static final File TABLE_DATA_DIR = new File(TEMP_DIR, TABLE_NAME);
+  private static final String STRING_COLUMN = "col1";
+  private static final String[] STRING_VALUES = {"A", "D", "E", "B", "C"};
+  private static final String LONG_COLUMN = "col2";
+  private static final long[] LONG_VALUES = {10000L, 20000L, 50000L, 40000L, 
30000L};
 
   @BeforeMethod
   public void setUp()
@@ -75,210 +96,328 @@ public class BaseTableDataManagerTest {
     FileUtils.deleteDirectory(TEMP_DIR);
   }
 
-  private BaseTableDataManager makeTestableManager() {
-    TableDataManagerConfig config = mock(TableDataManagerConfig.class);
-    when(config.getTableName()).thenReturn(TABLE_NAME);
-    when(config.getDataDir()).thenReturn(new File(TEMP_DIR, 
TABLE_NAME).getAbsolutePath());
+  @Test
+  public void testReloadSegmentNewData()
+      throws Exception {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName, 
SegmentVersion.v3, 5);
 
-    OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(config, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null);
-    tableDataManager.start();
-    return tableDataManager;
+    // Mock the case where segment is loaded but its CRC is different from
+    // the one in zk, thus raw segment is downloaded and loaded.
+    SegmentMetadata llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn("0");
+
+    BaseTableDataManager tmgr = createTableManager();
+    assertFalse(tmgr.getSegmentDataDir(segName).exists());
+    tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null, 
false);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getTotalDocs(), 5);
   }
 
   @Test
-  public void testReloadSegmentNewData()
+  public void testReloadSegmentUseLocalCopy()
       throws Exception {
-    BaseTableDataManager tmgr = makeTestableManager();
-    File tempRootDir = tmgr.getTmpSegmentDataDir("test-new-data");
-
-    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
-    // All input and intermediate files are put in the tempRootDir.
-    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    File tempInputDir = new File(tempRootDir, "seg01_input");
-    FileUtils
-        .write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=remove");
-    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
-    FileUtils.deleteQuietly(tempInputDir);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1, 
5);
+    String segCrc = getCRC(localSegDir, SegmentVersion.v1);
 
+    // Same CRCs so load the local segment directory directly.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
-
-    File indexDir = tmgr.getSegmentDataDir("seg01");
-    FileUtils.write(new File(indexDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=local");
-
-    // Different CRCs leading to segment download.
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
     SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn("10240");
-    when(llmd.getIndexDir()).thenReturn(indexDir);
+    when(llmd.getCrc()).thenReturn(segCrc);
+
+    BaseTableDataManager tmgr = createTableManager();
+    tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null, 
false);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getTotalDocs(), 5);
 
-    tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, 
null, false);
-    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
-    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
-        .contains("k=remove"));
+    FileUtils.deleteQuietly(localSegDir);
+    try {
+      tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, 
null, false);
+      fail();
+    } catch (Exception e) {
+      // As expected, segment reloading fails due to missing the local segment 
dir.
+      assertTrue(e.getMessage().contains("does not exist or is not a 
directory"));
+    }
   }
 
   @Test
-  public void testReloadSegmentLocalCopy()
+  public void testReloadSegmentConvertVersion()
       throws Exception {
-    BaseTableDataManager tmgr = makeTestableManager();
-    File tempRootDir = tmgr.getTmpSegmentDataDir("test-local-copy");
-
-    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
-    // All input and intermediate files are put in the tempRootDir.
-    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    File tempInputDir = new File(tempRootDir, "seg01_input");
-    FileUtils
-        .write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
-    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
-    FileUtils.deleteQuietly(tempInputDir);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1, 
5);
+    String segCrc = getCRC(localSegDir, SegmentVersion.v1);
 
+    // Same CRCs so load the local segment directory directly.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
-
-    File indexDir = tmgr.getSegmentDataDir("seg01");
-    FileUtils.write(new File(indexDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=local");
-
-    // Same CRCs so load the local copy.
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
     SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn("1024");
-    when(llmd.getIndexDir()).thenReturn(indexDir);
-
-    tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, 
null, false);
-    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
-    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
-        .contains("k=local"));
+    when(llmd.getCrc()).thenReturn(segCrc);
+
+    // Require to use v3 format.
+    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    idxCfg.setSegmentVersion(SegmentVersion.v3);
+
+    BaseTableDataManager tmgr = createTableManager();
+    tmgr.reloadSegment(segName, idxCfg, zkmd, llmd, null, false);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getVersion(), SegmentVersion.v3);
+    assertEquals(llmd.getTotalDocs(), 5);
   }
 
   @Test
-  public void testReloadSegmentForceDownload()
+  public void testReloadSegmentAddIndex()
       throws Exception {
-    BaseTableDataManager tmgr = makeTestableManager();
-    File tempRootDir = tmgr.getTmpSegmentDataDir("test-force-download");
-
-    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
-    // All input and intermediate files are put in the tempRootDir.
-    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    File tempInputDir = new File(tempRootDir, "seg01_input");
-    FileUtils
-        .write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
-    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
-    FileUtils.deleteQuietly(tempInputDir);
-
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 
5);
+    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+    assertFalse(hasInvertedIndex(localSegDir, STRING_COLUMN, 
SegmentVersion.v3));
+    assertFalse(hasInvertedIndex(localSegDir, LONG_COLUMN, SegmentVersion.v3));
+
+    // Same CRCs so load the local segment directory directly.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    SegmentMetadata llmd = mock(SegmentMetadata.class);
+    when(llmd.getCrc()).thenReturn(segCrc);
+
+    // Require to add indices.
+    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    idxCfg.setSegmentVersion(SegmentVersion.v3);
+    idxCfg.setInvertedIndexColumns(new HashSet<>(Arrays.asList(STRING_COLUMN, 
LONG_COLUMN)));
+
+    BaseTableDataManager tmgr = createTableManager();
+    tmgr.reloadSegment(segName, idxCfg, zkmd, llmd, null, false);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getTotalDocs(), 5);
+    assertTrue(hasInvertedIndex(tmgr.getSegmentDataDir(segName), 
STRING_COLUMN, SegmentVersion.v3));
+    assertTrue(hasInvertedIndex(tmgr.getSegmentDataDir(segName), LONG_COLUMN, 
SegmentVersion.v3));
+  }
 
-    File indexDir = tmgr.getSegmentDataDir("seg01");
-    FileUtils.write(new File(indexDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+  @Test
+  public void testReloadSegmentForceDownload()
+      throws Exception {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName, 
SegmentVersion.v3, 5);
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 
5);
+
+    // Same CRC but force to download.
+    BaseTableDataManager tmgr = createTableManager();
+    SegmentMetadataImpl llmd = new 
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getCrc(), zkmd.getCrc() + "");
+
+    // Remove the local segment dir. Segment reloading fails unless force to 
download.
+    FileUtils.deleteQuietly(localSegDir);
+    try {
+      tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, 
null, false);
+      fail();
+    } catch (Exception e) {
+      // As expected, segment reloading fails due to missing the local segment 
dir.
+      assertTrue(e.getMessage().contains("does not exist or is not a 
directory"));
+    }
 
-    // Same CRC but force to download
-    SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn("1024");
-    when(llmd.getIndexDir()).thenReturn(indexDir);
+    tmgr.reloadSegment(segName, createIndexLoadingConfig(), zkmd, llmd, null, 
true);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
 
-    tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd, 
null, true);
-    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
-    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
-        .contains("k=remote"));
+    llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getCrc(), zkmd.getCrc() + "");
+    assertEquals(llmd.getTotalDocs(), 5);
   }
 
   @Test
   public void testAddOrReplaceSegmentNewData()
       throws Exception {
-    BaseTableDataManager tmgr = makeTestableManager();
-    File tempRootDir = tmgr.getTmpSegmentDataDir("test-new-data");
-
-    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
-    // All input and intermediate files are put in the tempRootDir.
-    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    File tempInputDir = new File(tempRootDir, "seg01_input");
-    FileUtils.write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01");
-    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
-    FileUtils.deleteQuietly(tempInputDir);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName, 
SegmentVersion.v3, 5);
 
-    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
-
-    // Different CRCs leading to segment download.
+    // Mock the case where segment is loaded but its CRC is different from
+    // the one in zk, thus raw segment is downloaded and loaded.
     SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn("10240");
-
-    assertFalse(tmgr.getSegmentDataDir("seg01").exists());
-    tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, 
llmd);
-    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
-    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
-        .contains("docs=0"));
+    when(llmd.getCrc()).thenReturn("0");
+
+    BaseTableDataManager tmgr = createTableManager();
+    assertFalse(tmgr.getSegmentDataDir(segName).exists());
+    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, llmd);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    llmd = new SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getTotalDocs(), 5);
   }
 
   @Test
   public void testAddOrReplaceSegmentNoop()
       throws Exception {
-    BaseTableDataManager tmgr = makeTestableManager();
-
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
     when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
 
     SegmentMetadata llmd = mock(SegmentMetadata.class);
     when(llmd.getCrc()).thenReturn("1024");
 
+    BaseTableDataManager tmgr = createTableManager();
     assertFalse(tmgr.getSegmentDataDir("seg01").exists());
-    tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, 
llmd);
+    tmgr.addOrReplaceSegment("seg01", createIndexLoadingConfig(), zkmd, llmd);
     // As CRC is same, the index dir is left as is, so not get created by the 
test.
     assertFalse(tmgr.getSegmentDataDir("seg01").exists());
   }
 
   @Test
-  public void testAddOrReplaceSegmentRecovered()
+  public void testAddOrReplaceSegmentUseLocalCopy()
       throws Exception {
-    BaseTableDataManager tmgr = makeTestableManager();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 
5);
+    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
 
+    // Make local and remote CRC same to skip downloading raw segment.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    // Make this equal to the default crc value, so no need to make a dummy 
creation.meta file.
-    when(zkmd.getCrc()).thenReturn(Long.MIN_VALUE);
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+    when(zkmd.getDownloadUrl()).thenReturn("file://somewhere");
 
-    File backup = tmgr.getSegmentDataDir("seg01" + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-    FileUtils.write(new File(backup, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01");
+    BaseTableDataManager tmgr = createTableManager();
+    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    SegmentMetadataImpl llmd = new 
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getTotalDocs(), 5);
 
-    assertFalse(tmgr.getSegmentDataDir("seg01").exists());
-    tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, 
null);
-    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
-    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
-        .contains("docs=0"));
+    FileUtils.deleteQuietly(localSegDir);
+    try {
+      tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, 
null);
+      fail();
+    } catch (Exception e) {
+      // As expected, when local segment dir is missing, it tries to download
+      // raw segment from deep store, but it would fail with bad download uri.
+      assertEquals(e.getMessage(), "Operation failed after 3 attempts");
+    }
   }
 
   @Test
-  public void testAddOrReplaceSegmentNotRecovered()
+  public void testAddOrReplaceSegmentUseBackupCopy()
       throws Exception {
-    BaseTableDataManager tmgr = makeTestableManager();
-    File tempRootDir = tmgr.getTmpSegmentDataDir("test-force-download");
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 
5);
+    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
 
-    // Create an empty segment and compress it to tar.gz as the one in deep 
store.
-    // All input and intermediate files are put in the tempRootDir.
-    File tempTar = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    File tempInputDir = new File(tempRootDir, "seg01_input");
-    FileUtils
-        .write(new File(tempInputDir, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
-    TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
-    FileUtils.deleteQuietly(tempInputDir);
+    // Make local and remote CRC same to skip downloading raw segment.
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+    BaseTableDataManager tmgr = createTableManager();
+    File backup = tmgr.getSegmentDataDir(segName + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    localSegDir.renameTo(backup);
 
+    assertFalse(tmgr.getSegmentDataDir(segName).exists());
+    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    SegmentMetadataImpl llmd = new 
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getTotalDocs(), 5);
+  }
+
+  @Test
+  public void testAddOrReplaceSegmentStaleBackupCopy()
+      throws Exception {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    SegmentZKMetadata zkmd = createRawSegment(tableConfig, segName, 
SegmentVersion.v3, 5);
+
+    BaseTableDataManager tmgr = createTableManager();
+    // Create a local segment with fewer rows, making its CRC different from 
the raw segment.
+    // So that the raw segment is downloaded and loaded in the end.
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 
3);
+    File backup = tmgr.getSegmentDataDir(segName + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    localSegDir.renameTo(backup);
+
+    assertFalse(tmgr.getSegmentDataDir(segName).exists());
+    tmgr.addOrReplaceSegment(segName, createIndexLoadingConfig(), zkmd, null);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    SegmentMetadataImpl llmd = new 
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getTotalDocs(), 5);
+  }
+
+  @Test
+  public void testAddOrReplaceSegmentUpConvertVersion()
+      throws Exception {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v1, 
5);
+    String segCrc = getCRC(localSegDir, SegmentVersion.v1);
+
+    // Make local and remote CRC same to skip downloading raw segment.
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+    // Require to use v3 format.
+    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    idxCfg.setSegmentVersion(SegmentVersion.v3);
+
+    BaseTableDataManager tmgr = createTableManager();
+    tmgr.addOrReplaceSegment(segName, idxCfg, zkmd, null);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    SegmentMetadataImpl llmd = new 
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getVersion(), SegmentVersion.v3);
+    assertEquals(llmd.getTotalDocs(), 5);
+  }
 
-    // Though can recover from backup, but CRC is different. Local CRC is 
Long.MIN_VALUE.
-    File backup = tmgr.getSegmentDataDir("seg01" + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-    FileUtils.write(new File(backup, "metadata.properties"), 
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+  @Test
+  public void testAddOrReplaceSegmentDownConvertVersion()
+      throws Exception {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 
5);
+    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
 
-    assertFalse(tmgr.getSegmentDataDir("seg01").exists());
-    tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd, 
null);
-    assertTrue(tmgr.getSegmentDataDir("seg01").exists());
-    assertTrue(FileUtils.readFileToString(new 
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
-        .contains("k=remote"));
+    // Make local and remote CRC same to skip downloading raw segment.
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+    // Require to use v1 format.
+    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    idxCfg.setSegmentVersion(SegmentVersion.v1);
+
+    BaseTableDataManager tmgr = createTableManager();
+    tmgr.addOrReplaceSegment(segName, idxCfg, zkmd, null);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    SegmentMetadataImpl llmd = new 
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    // The existing segment preprocessing logic doesn't down convert segment 
format.
+    assertEquals(llmd.getVersion(), SegmentVersion.v3);
+    assertEquals(llmd.getTotalDocs(), 5);
+  }
+
+  @Test
+  public void testAddOrReplaceSegmentAddIndex()
+      throws Exception {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    String segName = "seg01";
+    File localSegDir = createSegment(tableConfig, segName, SegmentVersion.v3, 
5);
+    String segCrc = getCRC(localSegDir, SegmentVersion.v3);
+    assertFalse(hasInvertedIndex(localSegDir, STRING_COLUMN, 
SegmentVersion.v3));
+    assertFalse(hasInvertedIndex(localSegDir, LONG_COLUMN, SegmentVersion.v3));
+
+    // Make local and remote CRC same to skip downloading raw segment.
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+    // Require to add indices.
+    IndexLoadingConfig idxCfg = createIndexLoadingConfig();
+    idxCfg.setSegmentVersion(SegmentVersion.v3);
+    idxCfg.setInvertedIndexColumns(new HashSet<>(Arrays.asList(STRING_COLUMN, 
LONG_COLUMN)));
+
+    BaseTableDataManager tmgr = createTableManager();
+    tmgr.addOrReplaceSegment(segName, idxCfg, zkmd, null);
+    assertTrue(tmgr.getSegmentDataDir(segName).exists());
+    SegmentMetadataImpl llmd = new 
SegmentMetadataImpl(tmgr.getSegmentDataDir(segName));
+    assertEquals(llmd.getTotalDocs(), 5);
+    assertTrue(hasInvertedIndex(tmgr.getSegmentDataDir(segName), 
STRING_COLUMN, SegmentVersion.v3));
+    assertTrue(hasInvertedIndex(tmgr.getSegmentDataDir(segName), LONG_COLUMN, 
SegmentVersion.v3));
   }
 
   @Test
@@ -290,7 +429,7 @@ public class BaseTableDataManagerTest {
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
     when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempInput.getAbsolutePath());
 
-    BaseTableDataManager tmgr = makeTestableManager();
+    BaseTableDataManager tmgr = createTableManager();
     File tempRootDir = tmgr.getTmpSegmentDataDir("test-download-decrypt");
 
     File tarFile = tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
@@ -301,9 +440,9 @@ public class BaseTableDataManagerTest {
     assertEquals(FileUtils.readFileToString(tarFile), "this is from somewhere 
remote");
 
     FakePinotCrypter fakeCrypter = (FakePinotCrypter) 
PinotCrypterFactory.create("fakePinotCrypter");
-    assertTrue(
-        
fakeCrypter._origFile.getAbsolutePath().endsWith("__table01__/tmp/test-download-decrypt/seg01.tar.gz.enc"));
-    
assertTrue(fakeCrypter._decFile.getAbsolutePath().endsWith("__table01__/tmp/test-download-decrypt/seg01.tar.gz"));
+    String parentDir = TABLE_NAME + "/tmp/test-download-decrypt/";
+    assertTrue(fakeCrypter._origFile.getAbsolutePath().endsWith(parentDir + 
"seg01.tar.gz.enc"));
+    assertTrue(fakeCrypter._decFile.getAbsolutePath().endsWith(parentDir + 
"seg01.tar.gz"));
 
     try {
       // Set maxRetry to 0 to cause retry failure immediately.
@@ -320,7 +459,7 @@ public class BaseTableDataManagerTest {
   @Test
   public void testUntarAndMoveSegment()
       throws IOException {
-    BaseTableDataManager tmgr = makeTestableManager();
+    BaseTableDataManager tmgr = createTableManager();
     File tempRootDir = tmgr.getTmpSegmentDataDir("test-untar-move");
 
     // All input and intermediate files are put in the tempRootDir.
@@ -343,22 +482,6 @@ public class BaseTableDataManagerTest {
     }
   }
 
-  @Test
-  public void testIsNewSegmentMetadata()
-      throws IOException {
-    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
-    when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
-    assertTrue(BaseTableDataManager.isNewSegment(zkmd, null));
-
-    SegmentMetadata llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn("1024");
-    assertFalse(BaseTableDataManager.isNewSegment(zkmd, llmd));
-
-    llmd = mock(SegmentMetadata.class);
-    when(llmd.getCrc()).thenReturn("10245");
-    assertTrue(BaseTableDataManager.isNewSegment(zkmd, llmd));
-  }
-
   // Has to be public class for the class loader to work.
   public static class FakePinotCrypter implements PinotCrypter {
     private File _origFile;
@@ -392,10 +515,81 @@ public class BaseTableDataManagerTest {
     PinotCrypterFactory.init(new PinotConfiguration(properties));
   }
 
-  private static IndexLoadingConfig newDummyIndexLoadingConfig() {
-    IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
-    when(indexLoadingConfig.getReadMode()).thenReturn(ReadMode.mmap);
-    when(indexLoadingConfig.getSegmentVersion()).thenReturn(SegmentVersion.v3);
+  private static IndexLoadingConfig createIndexLoadingConfig() {
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
     return indexLoadingConfig;
   }
+
+  private static BaseTableDataManager createTableManager() {
+    TableDataManagerConfig config = mock(TableDataManagerConfig.class);
+    when(config.getTableName()).thenReturn(TABLE_NAME);
+    when(config.getDataDir()).thenReturn(TABLE_DATA_DIR.getAbsolutePath());
+
+    OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
+    tableDataManager.init(config, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null);
+    tableDataManager.start();
+    return tableDataManager;
+  }
+
+  private static SegmentZKMetadata createRawSegment(TableConfig tableConfig, 
String segName, SegmentVersion segVer,
+      int rowCnt)
+      throws Exception {
+    File segDir = createSegment(tableConfig, segName, segVer, rowCnt);
+    String segCrc = getCRC(segDir, SegmentVersion.v3);
+
+    SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+    File tempTar = new File(TEMP_DIR, segName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(segDir, tempTar);
+    when(zkmd.getDownloadUrl()).thenReturn("file://" + 
tempTar.getAbsolutePath());
+    when(zkmd.getCrc()).thenReturn(Long.valueOf(segCrc));
+
+    FileUtils.deleteQuietly(segDir);
+    return zkmd;
+  }
+
+  private static File createSegment(TableConfig tableConfig, String segName, 
SegmentVersion segVer, int rowCnt)
+      throws Exception {
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN, 
FieldSpec.DataType.STRING)
+        .addMetric(LONG_COLUMN, FieldSpec.DataType.LONG).build();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
+    config.setSegmentName(segName);
+    config.setSegmentVersion(segVer);
+    List<GenericRow> rows = new ArrayList<>(3);
+    for (int i = 0; i < rowCnt; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(STRING_COLUMN, STRING_VALUES[i]);
+      row.putValue(LONG_COLUMN, LONG_VALUES[i]);
+      rows.add(row);
+    }
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(rows));
+    driver.build();
+    return new File(TABLE_DATA_DIR, segName);
+  }
+
+  private static String getCRC(File segDir, SegmentVersion segVer)
+      throws IOException {
+    File parentDir = segDir;
+    if (segVer == SegmentVersion.v3) {
+      parentDir = new File(segDir, "v3");
+    }
+    File crcFile = new File(parentDir, V1Constants.SEGMENT_CREATION_META);
+    try (DataInputStream ds = new DataInputStream(new 
FileInputStream(crcFile))) {
+      return String.valueOf(ds.readLong());
+    }
+  }
+
+  private static boolean hasInvertedIndex(File segDir, String colName, 
SegmentVersion segVer)
+      throws IOException {
+    File parentDir = segDir;
+    if (segVer == SegmentVersion.v3) {
+      parentDir = new File(segDir, "v3");
+    }
+    File idxMapFile = new File(parentDir, V1Constants.INDEX_MAP_FILE_NAME);
+    return FileUtils.readFileToString(idxMapFile).contains(colName + 
".inverted_index");
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 22853ed..3ae95a6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -86,6 +86,15 @@ public interface TableDataManager {
    * A new segment may be downloaded if the local one has a different CRC; or 
can be forced to download
    * if forceDownload flag is true. This operation is conducted within a 
failure handling framework
    * and made transparent to ongoing queries, because the segment is in online 
serving state.
+   *
+   * @param segmentName the segment to reload
+   * @param indexLoadingConfig the latest table config to load segment
+   * @param zkMetadata the segment metadata from zookeeper
+   * @param localMetadata the segment metadata object held by server right now,
+   *                      which must not be null to reload the segment
+   * @param schema the latest table schema to load segment
+   * @param forceDownload whether to force to download raw segment to reload
+   * @throws Exception thrown upon failure when to reload the segment
    */
   void reloadSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata,
       SegmentMetadata localMetadata, @Nullable Schema schema, boolean 
forceDownload)
@@ -97,6 +106,13 @@ public interface TableDataManager {
    * This operation is conducted outside the failure handling framework as 
used in segment reloading,
    * because the segment is not yet online serving queries, e.g. this method 
is used to add a new segment,
    * or transition a segment to online serving state.
+   *
+   * @param segmentName the segment to add or replace
+   * @param indexLoadingConfig the latest table config to load segment
+   * @param zkMetadata the segment metadata from zookeeper
+   * @param localMetadata the segment metadata object held by server, which 
can be null when
+   *                      the server is restarted or the segment is newly 
added to the table
+   * @throws Exception thrown upon failure when to add or replace the segment
    */
   void addOrReplaceSegment(String segmentName, IndexLoadingConfig 
indexLoadingConfig, SegmentZKMetadata zkMetadata,
       @Nullable SegmentMetadata localMetadata)
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index 164d86a..3ad16ee 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.indexsegment.immutable;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -95,35 +96,59 @@ public class ImmutableSegmentLoader {
    * modify the segment like to convert segment format, add or remove indices.
    */
   public static ImmutableSegment load(File indexDir, IndexLoadingConfig 
indexLoadingConfig, @Nullable Schema schema,
-      boolean shouldModifySegment)
+      boolean needPreprocess)
       throws Exception {
     Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s 
does not exist or is not a directory",
         indexDir);
 
-    SegmentMetadataImpl localSegmentMetadata = new 
SegmentMetadataImpl(indexDir);
-    if (localSegmentMetadata.getTotalDocs() == 0) {
-      return new EmptyIndexSegment(localSegmentMetadata);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    if (segmentMetadata.getTotalDocs() == 0) {
+      return new EmptyIndexSegment(segmentMetadata);
     }
-
-    // This step will modify the segment data on disk.
-    if (shouldModifySegment) {
-      // Convert segment version as needed.
-      convertSegmentFormat(indexDir, indexLoadingConfig, localSegmentMetadata);
-      // Preprocess the segment on local using local SegmentDirectory.
-      preprocessSegment(indexDir, localSegmentMetadata.getName(), 
indexLoadingConfig, schema);
+    if (needPreprocess) {
+      preprocess(indexDir, indexLoadingConfig, schema);
     }
-
-    // Load the segment again using the configured segmentDirectoryLoader
-    PinotConfiguration segmentDirectoryConfigs = 
indexLoadingConfig.getSegmentDirectoryConfigs();
+    String segmentName = segmentMetadata.getName();
     SegmentDirectoryLoaderContext segmentLoaderContext =
         new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), 
indexLoadingConfig.getInstanceId(),
-            localSegmentMetadata.getName(), segmentDirectoryConfigs);
-
-    SegmentDirectoryLoader segmentDirectoryLoader =
+            segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+    SegmentDirectoryLoader segmentLoader =
         
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
-    SegmentDirectory actualSegmentDirectory = 
segmentDirectoryLoader.load(indexDir.toURI(), segmentLoaderContext);
-    SegmentDirectory.Reader segmentReader = 
actualSegmentDirectory.createReader();
-    SegmentMetadataImpl segmentMetadata = 
actualSegmentDirectory.getSegmentMetadata();
+    SegmentDirectory segmentDirectory = segmentLoader.load(indexDir.toURI(), 
segmentLoaderContext);
+    try {
+      return load(segmentDirectory, indexLoadingConfig, schema);
+    } catch (Exception e) {
+      LOGGER.error("Failed to load segment: {} with SegmentDirectory", 
segmentName, e);
+      segmentDirectory.close();
+      throw e;
+    }
+  }
+
+  /**
+   * Preprocess the local segment directory according to the current table 
config and schema.
+   */
+  public static void preprocess(File indexDir, IndexLoadingConfig 
indexLoadingConfig, @Nullable Schema schema)
+      throws Exception {
+    Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: %s 
does not exist or is not a directory",
+        indexDir);
+
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    if (segmentMetadata.getTotalDocs() > 0) {
+      convertSegmentFormat(indexDir, indexLoadingConfig, segmentMetadata);
+      preprocessSegment(indexDir, segmentMetadata.getName(), 
indexLoadingConfig, schema);
+    }
+  }
+
+  /**
+   * Load the segment represented by the SegmentDirectory object to serve 
queries.
+   */
+  public static ImmutableSegment load(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig,
+      @Nullable Schema schema)
+      throws Exception {
+    SegmentMetadataImpl segmentMetadata = 
segmentDirectory.getSegmentMetadata();
+    if (segmentMetadata.getTotalDocs() == 0) {
+      return new EmptyIndexSegment(segmentMetadata);
+    }
 
     // Remove columns not in schema from the metadata
     Map<String, ColumnMetadata> columnMetadataMap = 
segmentMetadata.getColumnMetadataMap();
@@ -138,16 +163,24 @@ public class ImmutableSegmentLoader {
       }
     }
 
+    URI indexDirURI = segmentDirectory.getIndexDir();
+    String scheme = indexDirURI.getScheme();
+    File localIndexDir = null;
+    if (scheme != null && scheme.equalsIgnoreCase("file")) {
+      localIndexDir = new File(indexDirURI);
+    }
+
+    SegmentDirectory.Reader segmentReader = segmentDirectory.createReader();
     Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
     for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
       // FIXME: text-index only works with local SegmentDirectory
       indexContainerMap.put(entry.getKey(),
-          new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), 
indexLoadingConfig, indexDir,
+          new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), 
indexLoadingConfig, localIndexDir,
               IndexingOverrides.getIndexReaderProvider()));
     }
 
     // Instantiate virtual columns
-    String segmentName = indexDir.getName();
+    String segmentName = segmentMetadata.getName();
     Schema segmentSchema = segmentMetadata.getSchema();
     
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(segmentSchema,
 segmentName);
     for (FieldSpec fieldSpec : segmentSchema.getAllFieldSpecs()) {
@@ -163,15 +196,15 @@ public class ImmutableSegmentLoader {
     // FIXME: star tree only works with local SegmentDirectory
     // Load star-tree index if it exists
     StarTreeIndexContainer starTreeIndexContainer = null;
-    if (segmentMetadata.getStarTreeV2MetadataList() != null) {
+    if (segmentMetadata.getStarTreeV2MetadataList() != null && localIndexDir 
!= null) {
       starTreeIndexContainer =
-          new 
StarTreeIndexContainer(SegmentDirectoryPaths.findSegmentDirectory(indexDir), 
segmentMetadata,
+          new 
StarTreeIndexContainer(SegmentDirectoryPaths.findSegmentDirectory(localIndexDir),
 segmentMetadata,
               indexContainerMap, indexLoadingConfig.getReadMode());
     }
 
     ImmutableSegmentImpl segment =
-        new ImmutableSegmentImpl(actualSegmentDirectory, segmentMetadata, 
indexContainerMap, starTreeIndexContainer);
-    LOGGER.info("Successfully loaded segment {} with config: {}", segmentName, 
segmentDirectoryConfigs);
+        new ImmutableSegmentImpl(segmentDirectory, segmentMetadata, 
indexContainerMap, starTreeIndexContainer);
+    LOGGER.info("Successfully loaded segment: {} with SegmentDirectory", 
segmentName);
     return segment;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
index 87bed35..ab836df 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/DefaultSegmentDirectoryLoader.java
@@ -47,7 +47,11 @@ public class DefaultSegmentDirectoryLoader implements 
SegmentDirectoryLoader {
   public SegmentDirectory load(URI indexDir, SegmentDirectoryLoaderContext 
segmentLoaderContext)
       throws Exception {
     PinotConfiguration segmentDirectoryConfigs = 
segmentLoaderContext.getSegmentDirectoryConfigs();
-    return new SegmentLocalFSDirectory(new File(indexDir),
+    File directory = new File(indexDir);
+    if (!directory.exists()) {
+      return new SegmentLocalFSDirectory(directory);
+    }
+    return new SegmentLocalFSDirectory(directory,
         
ReadMode.valueOf(segmentDirectoryConfigs.getProperty(IndexLoadingConfig.READ_MODE_KEY)));
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index f233ace..591d9ac 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -92,7 +92,7 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
       _nullValueVectorReader = null;
     }
 
-    if (loadTextIndex) {
+    if (loadTextIndex && segmentIndexDir != null) {
       Preconditions.checkState(segmentReader.hasIndexFor(columnName, 
ColumnIndexType.TEXT_INDEX));
       Map<String, Map<String, String>> columnProperties = 
indexLoadingConfig.getColumnProperties();
       _textIndex = indexReaderProvider.newTextIndexReader(segmentIndexDir, 
metadata, columnProperties.get(columnName));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
index 47b01ce..18e5ae3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.segment.index.loader;
 
 import java.io.File;
+import java.net.URI;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
@@ -54,7 +55,7 @@ import org.slf4j.LoggerFactory;
 public class SegmentPreProcessor implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPreProcessor.class);
 
-  private final File _indexDir;
+  private final URI _indexDirURI;
   private final IndexLoadingConfig _indexLoadingConfig;
   private final Schema _schema;
   private final SegmentDirectory _segmentDirectory;
@@ -63,7 +64,7 @@ public class SegmentPreProcessor implements AutoCloseable {
   public SegmentPreProcessor(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig,
       @Nullable Schema schema) {
     _segmentDirectory = segmentDirectory;
-    _indexDir = new File(segmentDirectory.getIndexDir());
+    _indexDirURI = segmentDirectory.getIndexDir();
     _indexLoadingConfig = indexLoadingConfig;
     _schema = schema;
     _segmentMetadata = segmentDirectory.getSegmentMetadata();
@@ -82,16 +83,19 @@ public class SegmentPreProcessor implements AutoCloseable {
       return;
     }
 
+    // Segment processing has to be done with a local directory.
+    File indexDir = new File(_indexDirURI);
+
     // This fixes the issue of temporary files not getting deleted after 
creating new inverted indexes.
-    removeInvertedIndexTempFiles();
+    removeInvertedIndexTempFiles(indexDir);
 
     try (SegmentDirectory.Writer segmentWriter = 
_segmentDirectory.createWriter()) {
       // Update default columns according to the schema.
       if (_schema != null) {
         DefaultColumnHandler defaultColumnHandler = DefaultColumnHandlerFactory
-            .getDefaultColumnHandler(_indexDir, _segmentMetadata, 
_indexLoadingConfig, _schema, segmentWriter);
+            .getDefaultColumnHandler(indexDir, _segmentMetadata, 
_indexLoadingConfig, _schema, segmentWriter);
         defaultColumnHandler.updateDefaultColumns();
-        _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+        _segmentMetadata = new SegmentMetadataImpl(indexDir);
         _segmentDirectory.reloadMetadata();
       } else {
         LOGGER.warn("Skip creating default columns for segment: {} without 
schema", _segmentMetadata.getName());
@@ -105,7 +109,7 @@ public class SegmentPreProcessor implements AutoCloseable {
       }
 
       // Create/modify/remove star-trees if required.
-      processStarTrees();
+      processStarTrees(indexDir);
 
       // Add min/max value to column metadata according to the prune mode.
       // For star-tree index, because it can only increase the range, so 
min/max value can still be used in pruner.
@@ -116,7 +120,7 @@ public class SegmentPreProcessor implements AutoCloseable {
             new ColumnMinMaxValueGenerator(_segmentMetadata, segmentWriter, 
columnMinMaxValueGeneratorMode);
         columnMinMaxValueGenerator.addColumnMinMaxValue();
         // NOTE: This step may modify the segment metadata. When adding new 
steps after this, un-comment the next line.
-        // _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+        // _segmentMetadata = new SegmentMetadataImpl(indexDir);
       }
 
       segmentWriter.save();
@@ -137,7 +141,7 @@ public class SegmentPreProcessor implements AutoCloseable {
       // Check if there is need to update default columns according to the 
schema.
       if (_schema != null) {
         DefaultColumnHandler defaultColumnHandler = DefaultColumnHandlerFactory
-            .getDefaultColumnHandler(_indexDir, _segmentMetadata, 
_indexLoadingConfig, _schema, null);
+            .getDefaultColumnHandler(null, _segmentMetadata, 
_indexLoadingConfig, _schema, null);
         if (defaultColumnHandler.needUpdateDefaultColumns()) {
           return true;
         }
@@ -190,7 +194,7 @@ public class SegmentPreProcessor implements AutoCloseable {
     return !starTreeBuilderConfigs.isEmpty();
   }
 
-  private void processStarTrees()
+  private void processStarTrees(File indexDir)
       throws Exception {
     // Create/modify/remove star-trees if required
     if (_indexLoadingConfig.isEnableDynamicStarTreeCreation()) {
@@ -204,8 +208,8 @@ public class SegmentPreProcessor implements AutoCloseable {
         if 
(StarTreeBuilderUtils.shouldRemoveExistingStarTrees(starTreeBuilderConfigs, 
starTreeMetadataList)) {
           // Remove the existing star-trees
           LOGGER.info("Removing star-trees from segment: {}", 
_segmentMetadata.getName());
-          StarTreeBuilderUtils.removeStarTrees(_indexDir);
-          _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+          StarTreeBuilderUtils.removeStarTrees(indexDir);
+          _segmentMetadata = new SegmentMetadataImpl(indexDir);
         } else {
           // Existing star-trees match the builder configs, no need to 
generate the star-trees
           shouldGenerateStarTree = false;
@@ -214,11 +218,11 @@ public class SegmentPreProcessor implements AutoCloseable 
{
       // Generate the star-trees if needed
       if (shouldGenerateStarTree) {
         // NOTE: Always use OFF_HEAP mode on server side.
-        try (MultipleTreesBuilder builder = new 
MultipleTreesBuilder(starTreeBuilderConfigs, _indexDir,
+        try (MultipleTreesBuilder builder = new 
MultipleTreesBuilder(starTreeBuilderConfigs, indexDir,
             MultipleTreesBuilder.BuildMode.OFF_HEAP)) {
           builder.build();
         }
-        _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+        _segmentMetadata = new SegmentMetadataImpl(indexDir);
       }
     }
   }
@@ -227,8 +231,8 @@ public class SegmentPreProcessor implements AutoCloseable {
    * Remove all the existing inverted index temp files before loading 
segments, by looking
    * for all files in the directory and remove the ones with  
'.bitmap.inv.tmp' extension.
    */
-  private void removeInvertedIndexTempFiles() {
-    File[] directoryListing = _indexDir.listFiles();
+  private void removeInvertedIndexTempFiles(File indexDir) {
+    File[] directoryListing = indexDir.listFiles();
     if (directoryListing == null) {
       return;
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index 36cf3fd..7735189 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -51,8 +51,8 @@ public class ColumnMinMaxValueGenerator {
   public ColumnMinMaxValueGenerator(SegmentMetadataImpl segmentMetadata, 
SegmentDirectory.Writer segmentWriter,
       ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode) {
     _segmentMetadata = segmentMetadata;
-    _segmentProperties = 
SegmentMetadataImpl.getPropertiesConfiguration(_segmentMetadata.getIndexDir());
     _segmentWriter = segmentWriter;
+    _segmentProperties = segmentMetadata.getPropertiesConfiguration();
     _columnMinMaxValueGeneratorMode = columnMinMaxValueGeneratorMode;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 45c0554..892ad8f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -123,7 +123,7 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
     _indexLoadingConfig = indexLoadingConfig;
     _schema = schema;
     _segmentWriter = segmentWriter;
-    _segmentProperties = 
SegmentMetadataImpl.getPropertiesConfiguration(indexDir);
+    _segmentProperties = _segmentMetadata.getPropertiesConfiguration();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
index d2dd7c6..9cff2ea 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
@@ -36,6 +36,7 @@ import 
org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +64,15 @@ public class SegmentLocalFSDirectory extends 
SegmentDirectory {
 
   private ColumnIndexDirectory _columnIndexDirectory;
 
+  // Create an empty SegmentLocalFSDirectory object mainly used to
+  // prepare env for subsequent processing on the segment.
+  public SegmentLocalFSDirectory(File directory) {
+    _indexDir = directory;
+    _segmentDirectory = null;
+    _segmentLock = new SegmentLock();
+    _readMode = null;
+  }
+
   public SegmentLocalFSDirectory(File directory, ReadMode readMode)
       throws IOException {
     this(directory, new SegmentMetadataImpl(directory), readMode);
@@ -95,6 +105,20 @@ public class SegmentLocalFSDirectory extends 
SegmentDirectory {
   }
 
   @Override
+  public void copyTo(File dest)
+      throws Exception {
+    File src = _indexDir;
+    if (!src.exists()) {
+      // If the original one doesn't exist, then try the backup directory.
+      File parentDir = _indexDir.getParentFile();
+      src = new File(parentDir, _indexDir.getName() + 
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+    }
+    if (src.exists() && !src.equals(dest)) {
+      FileUtils.copyDirectory(src, dest);
+    }
+  }
+
+  @Override
   public SegmentMetadataImpl getSegmentMetadata() {
     return _segmentMetadata;
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index f0847ab..3fed351 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -80,6 +80,9 @@ public class SegmentMetadataImpl implements SegmentMetadata {
 
   private SegmentVersion _segmentVersion;
   private List<StarTreeV2Metadata> _starTreeV2MetadataList;
+  // Caching properties around can be costly when the number of segments is 
high according to the
+  // finding in PR #2996. So for now, caching is used only when initializing 
from input streams.
+  private PropertiesConfiguration _segmentMetadataPropertiesConfiguration = 
null;
   private String _creatorName;
   private int _totalDocs;
   private final Map<String, String> _customMap = new HashMap<>();
@@ -96,13 +99,13 @@ public class SegmentMetadataImpl implements SegmentMetadata 
{
     _columnMetadataMap = new HashMap<>();
     _schema = new Schema();
 
-    PropertiesConfiguration segmentMetadataPropertiesConfiguration =
-        
CommonsConfigurationUtils.fromInputStream(metadataPropertiesInputStream);
-    init(segmentMetadataPropertiesConfiguration);
+    // Caching properties when initializing from input streams.
+    _segmentMetadataPropertiesConfiguration = 
CommonsConfigurationUtils.fromInputStream(metadataPropertiesInputStream);
+    init(_segmentMetadataPropertiesConfiguration);
     loadCreationMeta(creationMetaInputStream);
 
-    setTimeInfo(segmentMetadataPropertiesConfiguration);
-    _totalDocs = 
segmentMetadataPropertiesConfiguration.getInt(Segment.SEGMENT_TOTAL_DOCS);
+    setTimeInfo(_segmentMetadataPropertiesConfiguration);
+    _totalDocs = 
_segmentMetadataPropertiesConfiguration.getInt(Segment.SEGMENT_TOTAL_DOCS);
   }
 
   /**
@@ -139,6 +142,11 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
     _creationTime = creationTime;
   }
 
+  public PropertiesConfiguration getPropertiesConfiguration() {
+    return (_segmentMetadataPropertiesConfiguration != null) ? 
_segmentMetadataPropertiesConfiguration
+        : SegmentMetadataImpl.getPropertiesConfiguration(_indexDir);
+  }
+
   public static PropertiesConfiguration getPropertiesConfiguration(File 
indexDir) {
     File metadataFile = SegmentDirectoryPaths.findMetadataFile(indexDir);
     Preconditions.checkNotNull(metadataFile, "Cannot find segment metadata 
file under directory: %s", indexDir);
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
index 7f0dc76..cb207e0 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.spi.store;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Path;
@@ -141,6 +142,14 @@ public abstract class SegmentDirectory implements 
Closeable {
   }
 
   /**
+   * Copy segment directory to a local directory.
+   * @param dest the destination directory
+   */
+  public void copyTo(File dest)
+      throws Exception {
+  }
+
+  /**
    * Reader for columnar index buffers from segment directory
    */
   public abstract class Reader implements Closeable {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9d1af9c..21d3c90 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.server.starter.helix;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -54,6 +55,8 @@ import 
org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -271,22 +274,18 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
 
     File indexDir = segmentMetadata.getIndexDir();
     if (indexDir == null) {
-      if (!_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
-        LOGGER.info("Skip reloading REALTIME consuming segment: {} in table: 
{}", segmentName, tableNameWithType);
+      SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+      if (segmentDataManager == null) {
         return;
       }
-      Preconditions.checkState(schema != null, "Failed to find schema for 
table: {}", tableNameWithType);
-      LOGGER.info("Try reloading REALTIME consuming segment: {} in table: {}", 
segmentName, tableNameWithType);
-      SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
-      if (segmentDataManager != null) {
-        try {
-          MutableSegmentImpl mutableSegment = (MutableSegmentImpl) 
segmentDataManager.getSegment();
-          mutableSegment.addExtraColumns(schema);
-        } finally {
-          tableDataManager.releaseSegment(segmentDataManager);
+      try {
+        if (reloadMutableSegment(tableNameWithType, segmentName, 
segmentDataManager, schema)) {
+          // A mutable segment has been found and reloaded.
+          return;
         }
+      } finally {
+        tableDataManager.releaseSegment(segmentDataManager);
       }
-      return;
     }
 
     SegmentZKMetadata zkMetadata =
@@ -307,6 +306,30 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     }
   }
 
+  /**
+   * Try to reload a mutable segment.
+   * @return true if the segment is mutable and loaded; false if the segment 
is immutable.
+   */
+  @VisibleForTesting
+  boolean reloadMutableSegment(String tableNameWithType, String segmentName,
+      SegmentDataManager segmentDataManager, @Nullable Schema schema) {
+    IndexSegment segment = segmentDataManager.getSegment();
+    if (segment instanceof ImmutableSegment) {
+      LOGGER.info("Found an immutable segment: {} in table: {}", segmentName, 
tableNameWithType);
+      return false;
+    }
+    // Found a mutable/consuming segment from REALTIME table.
+    if (!_instanceDataManagerConfig.shouldReloadConsumingSegment()) {
+      LOGGER.info("Skip reloading REALTIME consuming segment: {} in table: 
{}", segmentName, tableNameWithType);
+      return true;
+    }
+    LOGGER.info("Reloading REALTIME consuming segment: {} in table: {}", 
segmentName, tableNameWithType);
+    Preconditions.checkState(schema != null, "Failed to find schema for table: 
{}", tableNameWithType);
+    MutableSegmentImpl mutableSegment = (MutableSegmentImpl) segment;
+    mutableSegment.addExtraColumns(schema);
+    return true;
+  }
+
   @Override
   public void addOrReplaceSegment(String tableNameWithType, String segmentName)
       throws Exception {
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
new file mode 100644
index 0000000..a8809bf
--- /dev/null
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.io.File;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class HelixInstanceDataManagerTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"HelixInstanceDataManagerTest");
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+
+  @Test
+  public void testReloadMutableSegment()
+      throws ConfigurationException {
+    // Provides required configs to init the instance data manager.
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty("id", "id01");
+    config.setProperty("dataDir", TEMP_DIR.getAbsolutePath());
+    config.setProperty("segmentTarDir", TEMP_DIR.getAbsolutePath());
+    config.setProperty("readMode", "mmap");
+    config.setProperty("reload.consumingSegment", "false");
+
+    HelixInstanceDataManager mgr = new HelixInstanceDataManager();
+    mgr.init(config, mock(HelixManager.class), mock(ServerMetrics.class));
+
+    SegmentDataManager segMgr = mock(SegmentDataManager.class);
+
+    when(segMgr.getSegment()).thenReturn(mock(ImmutableSegment.class));
+    assertFalse(mgr.reloadMutableSegment("table01", "seg01", segMgr, null));
+
+    when(segMgr.getSegment()).thenReturn(mock(MutableSegment.class));
+    assertTrue(mgr.reloadMutableSegment("table01", "seg01", segMgr, null));
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to