This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ad36c3482 Remove TableDataManagerConfig and simplify TableDataManager 
construction (#12189)
4ad36c3482 is described below

commit 4ad36c3482740386264478717343316b53cabeba
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jan 17 16:54:08 2024 -0800

    Remove TableDataManagerConfig and simplify TableDataManager construction 
(#12189)
---
 .../pinot/common/auth/AuthProviderUtils.java       |   3 +
 .../core/data/manager/BaseTableDataManager.java    |  86 +++++++-------
 .../manager/offline/TableDataManagerProvider.java  |  52 +++-----
 .../realtime/RealtimeSegmentDataManager.java       |   2 +-
 .../manager/realtime/RealtimeTableDataManager.java |   8 +-
 .../BaseTableDataManagerAcquireSegmentTest.java    |  40 +++----
 .../data/manager/BaseTableDataManagerTest.java     |  90 ++++++--------
 .../offline/DimensionTableDataManagerTest.java     |  24 ++--
 .../realtime/RealtimeSegmentDataManagerTest.java   |  73 +++++-------
 .../realtime/RealtimeTableDataManagerTest.java     |  77 ++++++------
 .../executor/QueryExecutorExceptionsTest.java      |  26 ++--
 .../core/query/executor/QueryExecutorTest.java     |  50 +++-----
 .../pinot/queries/ExplainPlanQueriesTest.java      |  39 +++---
 .../queries/SegmentWithNullValueVectorTest.java    |  56 +++------
 .../tests/UpsertTableIntegrationTest.java          |   2 +-
 .../UpsertTableSegmentPreloadIntegrationTest.java  |   4 +-
 .../local/data/manager/TableDataManager.java       |  29 ++---
 .../local/data/manager/TableDataManagerConfig.java | 113 ------------------
 .../local/data/manager/TableDataManagerParams.java |  66 -----------
 .../upsert/BaseTableUpsertMetadataManager.java     |   9 +-
 .../upsert/TableUpsertMetadataManagerFactory.java  |  13 +-
 .../starter/helix/HelixInstanceDataManager.java    |  21 ++--
 .../helix/HelixInstanceDataManagerConfig.java      | 132 ++++++++++++---------
 .../apache/pinot/server/api/BaseResourceTest.java  |  65 +++++-----
 .../config/instance/InstanceDataManagerConfig.java |   8 +-
 25 files changed, 410 insertions(+), 678 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
index 513bbf0929..cc02e01fa3 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
@@ -63,6 +63,9 @@ public final class AuthProviderUtils {
    * @return auth provider
    */
   public static AuthProvider extractAuthProvider(PinotConfiguration 
pinotConfig, String namespace) {
+    if (pinotConfig == null) {
+      return new NullAuthProvider();
+    }
     return makeAuthProvider(extractAuthConfig(pinotConfig, namespace));
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index f13f04c3b2..119bede805 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -60,8 +60,6 @@ import 
org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -73,6 +71,7 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -90,8 +89,10 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   // Semaphore to restrict the maximum number of parallel segment downloads 
for a table.
   private Semaphore _segmentDownloadSemaphore;
 
-  protected TableDataManagerConfig _tableDataManagerConfig;
+  protected InstanceDataManagerConfig _instanceDataManagerConfig;
   protected String _instanceId;
+  protected TableConfig _tableConfig;
+  protected HelixManager _helixManager;
   protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected ServerMetrics _serverMetrics;
   protected String _tableNameWithType;
@@ -99,9 +100,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected File _indexDir;
   protected File _resourceTmpDir;
   protected Logger _logger;
-  protected HelixManager _helixManager;
   protected ExecutorService _segmentPreloadExecutor;
   protected AuthProvider _authProvider;
+  protected String _peerDownloadScheme;
   protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
   protected boolean _isStreamSegmentDownloadUntar;
 
@@ -114,25 +115,22 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected volatile boolean _shutDown;
 
   @Override
-  public void init(TableDataManagerConfig tableDataManagerConfig, String 
instanceId,
-      ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics 
serverMetrics, HelixManager helixManager,
-      @Nullable ExecutorService segmentPreloadExecutor,
-      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> 
errorCache,
-      TableDataManagerParams tableDataManagerParams) {
-    LOGGER.info("Initializing table data manager for table: {}", 
tableDataManagerConfig.getTableName());
-
-    _tableDataManagerConfig = tableDataManagerConfig;
-    _instanceId = instanceId;
-    _propertyStore = propertyStore;
-    _serverMetrics = serverMetrics;
+  public void init(InstanceDataManagerConfig instanceDataManagerConfig, 
TableConfig tableConfig,
+      HelixManager helixManager, @Nullable ExecutorService 
segmentPreloadExecutor,
+      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> 
errorCache) {
+    LOGGER.info("Initializing table data manager for table: {}", 
tableConfig.getTableName());
+
+    _instanceDataManagerConfig = instanceDataManagerConfig;
+    _instanceId = instanceDataManagerConfig.getInstanceId();
+    _tableConfig = tableConfig;
     _helixManager = helixManager;
+    _propertyStore = helixManager.getHelixPropertyStore();
+    _serverMetrics = ServerMetrics.get();
     _segmentPreloadExecutor = segmentPreloadExecutor;
+    _authProvider = 
AuthProviderUtils.extractAuthProvider(_instanceDataManagerConfig.getAuthConfig(),
 null);
 
-    _authProvider =
-        
AuthProviderUtils.extractAuthProvider(toPinotConfiguration(_tableDataManagerConfig.getAuthConfig()),
 null);
-
-    _tableNameWithType = tableDataManagerConfig.getTableName();
-    _tableDataDir = tableDataManagerConfig.getDataDir();
+    _tableNameWithType = tableConfig.getTableName();
+    _tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + 
File.separator + _tableNameWithType;
     _indexDir = new File(_tableDataDir);
     if (!_indexDir.exists()) {
       Preconditions.checkState(_indexDir.mkdirs(), "Unable to create index 
directory at %s. "
@@ -148,18 +146,23 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     }
     _errorCache = errorCache;
     _recentlyDeletedSegments =
-        
CacheBuilder.newBuilder().maximumSize(tableDataManagerConfig.getTableDeletedSegmentsCacheSize())
-            
.expireAfterWrite(tableDataManagerConfig.getTableDeletedSegmentsCacheTtlMinutes(),
 TimeUnit.MINUTES)
-            .build();
+        
CacheBuilder.newBuilder().maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize())
+            
.expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(),
 TimeUnit.MINUTES).build();
+
+    _peerDownloadScheme = 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
+    if (_peerDownloadScheme == null) {
+      _peerDownloadScheme = 
instanceDataManagerConfig.getSegmentPeerDownloadScheme();
+    }
+
     _streamSegmentDownloadUntarRateLimitBytesPerSec =
-        
tableDataManagerParams.getStreamSegmentDownloadUntarRateLimitBytesPerSec();
-    _isStreamSegmentDownloadUntar = 
tableDataManagerParams.isStreamSegmentDownloadUntar();
+        instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
+    _isStreamSegmentDownloadUntar = 
instanceDataManagerConfig.isStreamSegmentDownloadUntar();
     if (_isStreamSegmentDownloadUntar) {
       LOGGER.info("Using streamed download-untar for segment download! "
               + "The rate limit interval for streamed download-untar is {} 
bytes/s",
           _streamSegmentDownloadUntarRateLimitBytesPerSec);
     }
-    int maxParallelSegmentDownloads = 
tableDataManagerParams.getMaxParallelSegmentDownloads();
+    int maxParallelSegmentDownloads = 
instanceDataManagerConfig.getMaxParallelSegmentDownloads();
     if (maxParallelSegmentDownloads > 0) {
       LOGGER.info(
           "Construct segment download semaphore for Table: {}. Maximum number 
of parallel segment downloads: {}",
@@ -178,6 +181,16 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
   protected abstract void doInit();
 
+  @Override
+  public String getInstanceId() {
+    return _instanceId;
+  }
+
+  @Override
+  public InstanceDataManagerConfig getInstanceDataManagerConfig() {
+    return _instanceDataManagerConfig;
+  }
+
   @Override
   public synchronized void start() {
     _logger.info("Starting table data manager for table: {}", 
_tableNameWithType);
@@ -255,7 +268,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
         indexDir.getName(), _tableNameWithType);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
-    
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+    
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
     addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
indexLoadingConfig.getSchema()));
   }
 
@@ -382,11 +395,6 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     return _indexDir;
   }
 
-  @Override
-  public TableDataManagerConfig getTableDataManagerConfig() {
-    return _tableDataManagerConfig;
-  }
-
   @Override
   public void addSegmentError(String segmentName, SegmentErrorInfo 
segmentErrorInfo) {
     _errorCache.put(Pair.of(_tableNameWithType, segmentName), 
segmentErrorInfo);
@@ -413,7 +421,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     String segmentTier = getSegmentCurrentTier(segmentName);
     indexLoadingConfig.setSegmentTier(segmentTier);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
-    
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+    
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
     File indexDir = getSegmentDataDir(segmentName, segmentTier, 
indexLoadingConfig.getTableConfig());
     try {
       // Download segment from deep store if CRC changes or forced to download;
@@ -507,7 +515,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     String segmentTier = zkMetadata.getTier();
     indexLoadingConfig.setSegmentTier(segmentTier);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
-    
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+    
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
     if (localMetadata == null && tryLoadExistingSegment(segmentName, 
indexLoadingConfig, zkMetadata)) {
       return;
     }
@@ -630,7 +638,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       LOGGER.error("Attempts exceeded when downloading segment: {} for table: 
{} from: {} to: {}", segmentName,
           _tableNameWithType, uri, tarFile);
       _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L);
-      if (_tableDataManagerConfig.getTablePeerDownloadScheme() == null) {
+      if (_peerDownloadScheme == null) {
         throw e;
       }
       downloadFromPeersWithoutStreaming(segmentName, zkMetadata, tarFile);
@@ -649,11 +657,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   // not thread safe. Caller should invoke it with safe concurrency control.
   protected void downloadFromPeersWithoutStreaming(String segmentName, 
SegmentZKMetadata zkMetadata, File destTarFile)
       throws Exception {
-    
Preconditions.checkArgument(_tableDataManagerConfig.getTablePeerDownloadScheme()
 != null,
-        "Download peers require non null peer download scheme");
+    Preconditions.checkState(_peerDownloadScheme != null, "Download peers 
require non null peer download scheme");
     List<URI> peerSegmentURIs =
-        PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
_tableDataManagerConfig.getTablePeerDownloadScheme(),
-            _helixManager, _tableNameWithType);
+        PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
_peerDownloadScheme, _helixManager, _tableNameWithType);
     if (peerSegmentURIs.isEmpty()) {
       String msg = String.format("segment %s doesn't have any peers", 
segmentName);
       LOGGER.warn(msg);
@@ -745,7 +751,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       return getSegmentDataDir(segmentName);
     }
     String tierDataDir =
-        TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, 
_tableDataManagerConfig.getInstanceTierConfigs());
+        TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, 
_instanceDataManagerConfig.getTierConfigs());
     if (StringUtils.isEmpty(tierDataDir)) {
       return getSegmentDataDir(segmentName);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 946ecf1565..f3cfa41ee7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -27,15 +27,11 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -45,47 +41,38 @@ import org.apache.pinot.spi.utils.IngestionConfigUtils;
  * Factory for {@link TableDataManager}.
  */
 public class TableDataManagerProvider {
-  private static Semaphore _segmentBuildSemaphore;
-  private static TableDataManagerParams _tableDataManagerParams;
+  private final InstanceDataManagerConfig _instanceDataManagerConfig;
+  private final Semaphore _segmentBuildSemaphore;
 
-  private TableDataManagerProvider() {
+  public TableDataManagerProvider(InstanceDataManagerConfig 
instanceDataManagerConfig) {
+    _instanceDataManagerConfig = instanceDataManagerConfig;
+    int maxParallelSegmentBuilds = 
instanceDataManagerConfig.getMaxParallelSegmentBuilds();
+    _segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new 
Semaphore(maxParallelSegmentBuilds, true) : null;
   }
 
-  public static void init(InstanceDataManagerConfig instanceDataManagerConfig) 
{
-    int maxParallelBuilds = 
instanceDataManagerConfig.getMaxParallelSegmentBuilds();
-    if (maxParallelBuilds > 0) {
-      _segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
-    }
-    _tableDataManagerParams = new 
TableDataManagerParams(instanceDataManagerConfig);
-  }
-
-  public static TableDataManager getTableDataManager(TableDataManagerConfig 
tableDataManagerConfig, String instanceId,
-      ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics 
serverMetrics, HelixManager helixManager,
-      LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
-    return getTableDataManager(tableDataManagerConfig, instanceId, 
propertyStore, serverMetrics, helixManager, null,
-        errorCache, () -> true);
+  public TableDataManager getTableDataManager(TableConfig tableConfig, 
HelixManager helixManager) {
+    return getTableDataManager(tableConfig, helixManager, null, null, () -> 
true);
   }
 
-  public static TableDataManager getTableDataManager(TableDataManagerConfig 
tableDataManagerConfig, String instanceId,
-      ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics 
serverMetrics, HelixManager helixManager,
-      @Nullable ExecutorService segmentPreloadExecutor, 
LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
+  public TableDataManager getTableDataManager(TableConfig tableConfig, 
HelixManager helixManager,
+      @Nullable ExecutorService segmentPreloadExecutor,
+      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> 
errorCache,
       Supplier<Boolean> isServerReadyToServeQueries) {
     TableDataManager tableDataManager;
-    switch (tableDataManagerConfig.getTableType()) {
+    switch (tableConfig.getTableType()) {
       case OFFLINE:
-        if (tableDataManagerConfig.isDimTable()) {
-          tableDataManager = 
DimensionTableDataManager.createInstanceByTableName(tableDataManagerConfig.getTableName());
+        if (tableConfig.isDimTable()) {
+          tableDataManager = 
DimensionTableDataManager.createInstanceByTableName(tableConfig.getTableName());
         } else {
           tableDataManager = new OfflineTableDataManager();
         }
         break;
       case REALTIME:
-        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(
-            tableDataManagerConfig.getTableConfig());
+        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
         if 
(Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
-            && 
StringUtils.isEmpty(tableDataManagerConfig.getInstanceDataManagerConfig().getSegmentStoreUri()))
 {
+            && 
StringUtils.isEmpty(_instanceDataManagerConfig.getSegmentStoreUri())) {
           throw new IllegalStateException(String.format("Table has enabled %s 
config. But the server has not "
-              + "configured the segmentstore uri. Configure the server config 
%s",
+                  + "configured the segmentstore uri. Configure the server 
config %s",
               StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
         }
         tableDataManager = new 
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
@@ -93,8 +80,7 @@ public class TableDataManagerProvider {
       default:
         throw new IllegalStateException();
     }
-    tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, 
serverMetrics, helixManager,
-        segmentPreloadExecutor, errorCache, _tableDataManagerParams);
+    tableDataManager.init(_instanceDataManagerConfig, tableConfig, 
helixManager, segmentPreloadExecutor, errorCache);
     return tableDataManager;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index fa1501e868..282f3c72a1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1358,7 +1358,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _isReadyToConsumeData = isReadyToConsumeData;
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
-    _instanceId = _realtimeTableDataManager.getServerInstance();
+    _instanceId = _realtimeTableDataManager.getInstanceId();
     _leaseExtender = 
SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
     _protocolHandler = new 
ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
     CompletionConfig completionConfig = 
_tableConfig.getValidationConfig().getCompletionConfig();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 333be09b0c..2ddc0f7eb5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -207,8 +207,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
       // NOTE: Set _tableUpsertMetadataManager before initializing it because 
when preloading is enabled, we need to
       //       load segments into it
-      _tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(tableConfig,
-          
_tableDataManagerConfig.getInstanceDataManagerConfig().getUpsertConfigs());
+      _tableUpsertMetadataManager =
+          TableUpsertMetadataManagerFactory.create(tableConfig, 
_instanceDataManagerConfig.getUpsertConfig());
       _tableUpsertMetadataManager.init(tableConfig, schema, this, 
_helixManager, _segmentPreloadExecutor);
     }
 
@@ -328,7 +328,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   public String getConsumerDir() {
-    String consumerDirPath = _tableDataManagerConfig.getConsumerDir();
+    String consumerDirPath = _instanceDataManagerConfig.getConsumerDir();
     File consumerDir;
     // If a consumer directory has been configured, use it to create a 
per-table path under the consumer dir.
     // Otherwise, create a sub-dir under the table-specific data director and 
use it for consumer mmaps
@@ -391,7 +391,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
     // Assign table directory and tier info to not let the segment be moved 
during loading/preprocessing
     indexLoadingConfig.setTableDataDir(_tableDataDir);
-    
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+    
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
     indexLoadingConfig.setSegmentTier(segmentZKMetadata.getTier());
 
     File segmentDir = new File(_indexDir, segmentName);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index c5a38c4f94..a245c740be 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,26 +30,24 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.configuration2.MapConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
-import org.testng.annotations.AfterSuite;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
@@ -58,7 +55,6 @@ import static org.mockito.Mockito.*;
 
 public class BaseTableDataManagerAcquireSegmentTest {
   private static final String RAW_TABLE_NAME = "testTable";
-  private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
   private static final String SEGMENT_PREFIX = "segment";
   private static final int DELETED_SEGMENTS_CACHE_SIZE = 100;
   private static final int DELETED_SEGMENTS_TTL_MINUTES = 2;
@@ -86,9 +82,11 @@ public class BaseTableDataManagerAcquireSegmentTest {
   private volatile int _lo;
   private volatile int _hi;
 
-  @BeforeSuite
+  @BeforeClass
   public void setUp()
       throws Exception {
+    ServerMetrics.register(mock(ServerMetrics.class));
+
     _tmpDir = new File(FileUtils.getTempDirectory(), 
"OfflineTableDataManagerTest");
     TestUtils.ensureDirectoriesExistAndEmpty(_tmpDir);
     _tmpDir.deleteOnExit();
@@ -98,7 +96,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
     System.out.printf("Record random seed: %d to reproduce test results upon 
failure\n", seed);
   }
 
-  @AfterSuite
+  @AfterClass
   public void tearDown() {
     if (_tmpDir != null) {
       org.apache.commons.io.FileUtils.deleteQuietly(_tmpDir);
@@ -119,19 +117,13 @@ public class BaseTableDataManagerAcquireSegmentTest {
 
   private TableDataManager makeTestableManager()
       throws Exception {
+    InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(_tmpDir.getAbsolutePath());
+    
when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
+    
when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
     TableDataManager tableDataManager = new OfflineTableDataManager();
-    TableDataManagerConfig config;
-    {
-      config = mock(TableDataManagerConfig.class);
-      when(config.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
-      when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
-      when(config.getAuthConfig()).thenReturn(new MapConfiguration(new 
HashMap<>()));
-      
when(config.getTableDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
-      
when(config.getTableDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
-    }
-    tableDataManager.init(config, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
-        new TableDataManagerParams(0, false, -1));
+    tableDataManager.init(instanceDataManagerConfig, tableConfig, 
mock(HelixManager.class), null, null);
     tableDataManager.start();
     Field segsMapField = 
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
     segsMapField.setAccessible(true);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 9113a327ad..d4c5f4fc29 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -29,10 +29,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.configuration2.MapConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.tier.TierFactory;
@@ -41,8 +39,6 @@ import 
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -51,6 +47,7 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
@@ -60,23 +57,17 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.util.TestUtils;
 import org.mockito.MockedStatic;
-import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -93,15 +84,21 @@ public class BaseTableDataManagerTest {
   private static final String LONG_COLUMN = "col2";
   private static final long[] LONG_VALUES = {10000L, 20000L, 50000L, 40000L, 
30000L};
 
-  @BeforeMethod
+  @BeforeClass
   public void setUp()
       throws Exception {
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
+  @BeforeMethod
+  public void setUpMethod()
+      throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
     TableDataManagerTestUtils.initSegmentFetcher();
   }
 
   @AfterMethod
-  public void tearDown()
+  public void tearDownMethod()
       throws Exception {
     FileUtils.deleteDirectory(TEMP_DIR);
   }
@@ -623,14 +620,15 @@ public class BaseTableDataManagerTest {
 
   // case 2: if the attempt to download from deep storage exceeds, invoke 
downloadFromPeers.
   @Test
-  public void testDownloadAndDecryptPeerDownload() throws Exception {
+  public void testDownloadAndDecryptPeerDownload()
+      throws Exception {
     String backupCopyURI = mockRemoteCopy().toString();
     SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
     when(zkmd.getDownloadUrl()).thenReturn(backupCopyURI);
 
-    TableDataManagerConfig config = createDefaultTableDataManagerConfig();
-    when(config.getTablePeerDownloadScheme()).thenReturn("http");
-    BaseTableDataManager tmgr = createSpyOfflineTableManager(config);
+    InstanceDataManagerConfig config = 
createDefaultInstanceDataManagerConfig();
+    when(config.getSegmentPeerDownloadScheme()).thenReturn("http");
+    BaseTableDataManager tmgr = spy(createTableManager(config));
     File tempRootDir = tmgr.getTmpSegmentDataDir("test-download-decrypt-peer");
 
     // As the case 2 description says, we need to mock the static method 
fetchAndDecryptSegmentToLocal to
@@ -651,17 +649,18 @@ public class BaseTableDataManagerTest {
 
   // happy case: download from peers
   @Test
-  public void testDownloadFromPeersWithoutStreaming() throws Exception {
+  public void testDownloadFromPeersWithoutStreaming()
+      throws Exception {
     URI uri = mockRemoteCopy();
-    TableDataManagerConfig config = createDefaultTableDataManagerConfig();
-    when(config.getTablePeerDownloadScheme()).thenReturn("http");
-    HelixManager mockedHelix = mock(HelixManager.class);
-    BaseTableDataManager tmgr = createTableManager(config, mockedHelix);
+    InstanceDataManagerConfig config = 
createDefaultInstanceDataManagerConfig();
+    when(config.getSegmentPeerDownloadScheme()).thenReturn("http");
+    HelixManager helixManager = mock(HelixManager.class);
+    BaseTableDataManager tmgr = createTableManager(config, helixManager);
     File tempRootDir = 
tmgr.getTmpSegmentDataDir("test-download-peer-without-streaming");
     File destFile = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
     try (MockedStatic<PeerServerSegmentFinder> mockPeerSegFinder = 
mockStatic(PeerServerSegmentFinder.class)) {
-      mockPeerSegFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(
-          "seg01", "http", mockedHelix, TABLE_NAME_WITH_TYPE))
+      mockPeerSegFinder.when(
+              () -> PeerServerSegmentFinder.getPeerServerURIs("seg01", "http", 
helixManager, TABLE_NAME_WITH_TYPE))
           .thenReturn(Collections.singletonList(uri));
       tmgr.downloadFromPeersWithoutStreaming("seg01", 
mock(SegmentZKMetadata.class), destFile);
     }
@@ -714,40 +713,26 @@ public class BaseTableDataManagerTest {
     }
   }
 
-  private static BaseTableDataManager createTableManager() {
-    TableDataManagerConfig config = createDefaultTableDataManagerConfig();
-
-    OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(config, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
-        new TableDataManagerParams(0, false, -1));
-    tableDataManager.start();
-    return tableDataManager;
+  private static OfflineTableDataManager createTableManager() {
+    return createTableManager(createDefaultInstanceDataManagerConfig());
   }
 
-  private static BaseTableDataManager 
createTableManager(TableDataManagerConfig config, HelixManager helixManager) {
-    OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(config, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
helixManager, null, null,
-        new TableDataManagerParams(0, false, -1));
-    tableDataManager.start();
-    return tableDataManager;
+  private static OfflineTableDataManager 
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
+    return createTableManager(instanceDataManagerConfig, 
mock(HelixManager.class));
   }
 
-  private static OfflineTableDataManager 
createSpyOfflineTableManager(TableDataManagerConfig tableDataManagerConfig) {
+  private static OfflineTableDataManager 
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig,
+      HelixManager helixManager) {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(tableDataManagerConfig, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
-        new TableDataManagerParams(0, false, -1));
+    tableDataManager.init(instanceDataManagerConfig, tableConfig, 
helixManager, null, null);
     tableDataManager.start();
-    return Mockito.spy(tableDataManager);
+    return tableDataManager;
   }
 
-  private static TableDataManagerConfig createDefaultTableDataManagerConfig() {
-    TableDataManagerConfig config = mock(TableDataManagerConfig.class);
-    when(config.getTableName()).thenReturn(TABLE_NAME_WITH_TYPE);
-    when(config.getDataDir()).thenReturn(TABLE_DATA_DIR.getAbsolutePath());
-    when(config.getAuthConfig()).thenReturn(new 
MapConfiguration(Collections.emptyMap()));
+  private static InstanceDataManagerConfig 
createDefaultInstanceDataManagerConfig() {
+    InstanceDataManagerConfig config = mock(InstanceDataManagerConfig.class);
+    when(config.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     return config;
   }
 
@@ -797,7 +782,8 @@ public class BaseTableDataManagerTest {
             Collections.singletonMap("dataDir", 
dataDir.getAbsolutePath())))).build();
   }
 
-  private static URI mockRemoteCopy() throws IOException, URISyntaxException {
+  private static URI mockRemoteCopy()
+      throws IOException, URISyntaxException {
     File tempInput = new File(TEMP_DIR, "tmp.txt");
     FileUtils.write(tempInput, "this is from somewhere remote");
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 6816d01558..6a8c1b83c5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -32,8 +32,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -42,6 +40,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.DimensionTableConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -50,7 +49,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.AfterClass;
@@ -79,20 +77,23 @@ public class DimensionTableDataManagerTest {
   @BeforeClass
   public void setUp()
       throws Exception {
+    ServerMetrics.register(mock(ServerMetrics.class));
+
     // prepare segment data
     URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
     assertNotNull(resourceUrl);
     File avroFile = new File(resourceUrl.getFile());
 
     // create segment
+    File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);
     SegmentGeneratorConfig segmentGeneratorConfig =
-        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, 
TEMP_DIR, RAW_TABLE_NAME);
+        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, 
tableDataDir, RAW_TABLE_NAME);
     SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
     driver.init(segmentGeneratorConfig);
     driver.build();
 
     String segmentName = driver.getSegmentName();
-    _indexDir = new File(TEMP_DIR, segmentName);
+    _indexDir = new File(tableDataDir, segmentName);
     _indexLoadingConfig = new IndexLoadingConfig();
     _segmentMetadata = new SegmentMetadataImpl(_indexDir);
     _segmentZKMetadata = new SegmentZKMetadata(segmentName);
@@ -124,17 +125,12 @@ public class DimensionTableDataManagerTest {
   }
 
   private DimensionTableDataManager makeTableDataManager(HelixManager 
helixManager) {
+    InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
+    TableConfig tableConfig = getTableConfig(false);
     DimensionTableDataManager tableDataManager =
         
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
-    TableDataManagerConfig config;
-    {
-      config = mock(TableDataManagerConfig.class);
-      when(config.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
-      when(config.getDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
-    }
-    tableDataManager.init(config, "dummyInstance", 
helixManager.getHelixPropertyStore(),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
helixManager, null, null,
-        new TableDataManagerParams(0, false, -1));
+    tableDataManager.init(instanceDataManagerConfig, tableConfig, 
helixManager, null, null);
     tableDataManager.start();
     return tableDataManager;
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 1574892f42..cc93fcfe77 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -46,13 +46,11 @@ import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.segment.creator.Fixtures;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
@@ -78,8 +76,7 @@ import static org.mockito.Mockito.when;
 
 // TODO Re-write this test using the stream abstraction
 public class RealtimeSegmentDataManagerTest {
-  private static final String SEGMENT_DIR = "/tmp/" + 
RealtimeSegmentDataManagerTest.class.getSimpleName();
-  private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR);
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"RealtimeSegmentDataManagerTest");
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
   private static final int PARTITION_GROUP_ID = 0;
@@ -104,7 +101,7 @@ public class RealtimeSegmentDataManagerTest {
     SegmentBuildTimeLeaseExtender.getOrCreate(instanceId, new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
         tableConfig.getTableName());
     RealtimeTableDataManager tableDataManager = 
mock(RealtimeTableDataManager.class);
-    when(tableDataManager.getServerInstance()).thenReturn(instanceId);
+    when(tableDataManager.getInstanceId()).thenReturn(instanceId);
     RealtimeSegmentStatsHistory statsHistory = 
mock(RealtimeSegmentStatsHistory.class);
     when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
     when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
@@ -148,19 +145,22 @@ public class RealtimeSegmentDataManagerTest {
     _partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new 
Semaphore(1));
     Schema schema = Fixtures.createSchema();
     ServerMetrics serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
-    return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
tableDataManager, SEGMENT_DIR, schema,
-        llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics, 
timeSupplier);
+    return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
tableDataManager,
+        new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema, 
llcSegmentName,
+        _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier);
   }
 
   @BeforeClass
   public void setUp() {
-    SEGMENT_DIR_FILE.deleteOnExit();
+    ServerMetrics.register(mock(ServerMetrics.class));
+
+    FileUtils.deleteQuietly(TEMP_DIR);
     SegmentBuildTimeLeaseExtender.initExecutor();
   }
 
   @AfterClass
   public void tearDown() {
-    FileUtils.deleteQuietly(SEGMENT_DIR_FILE);
+    FileUtils.deleteQuietly(TEMP_DIR);
     SegmentBuildTimeLeaseExtender.shutdownExecutor();
   }
 
@@ -222,8 +222,7 @@ public class RealtimeSegmentDataManagerTest {
     Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
     Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
     Assert.assertTrue(segmentDataManager._commitSegmentCalled);
-    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
-        RealtimeSegmentDataManager.State.COMMITTED);
+    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), 
RealtimeSegmentDataManager.State.COMMITTED);
     segmentDataManager.destroy();
   }
 
@@ -286,17 +285,17 @@ public class RealtimeSegmentDataManagerTest {
     Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
     Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
     Assert.assertTrue(segmentDataManager._commitSegmentCalled);
-    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
-        RealtimeSegmentDataManager.State.COMMITTED);
+    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), 
RealtimeSegmentDataManager.State.COMMITTED);
     segmentDataManager.destroy();
   }
 
   @Test
-  public void testCommitAfterCatchupWithPeriodOffset() throws Exception {
+  public void testCommitAfterCatchupWithPeriodOffset()
+      throws Exception {
     TableConfig tableConfig = createTableConfig();
-    tableConfig.getIndexingConfig().getStreamConfigs()
-        .put(StreamConfigProperties.constructStreamProperty(
-            StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA, 
"fakeStream"), "2d");
+    tableConfig.getIndexingConfig().getStreamConfigs().put(
+        
StreamConfigProperties.constructStreamProperty(StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA,
+            "fakeStream"), "2d");
     FakeRealtimeSegmentDataManager segmentDataManager =
         createFakeSegmentManager(false, new TimeSupplier(), null, null, 
tableConfig);
     RealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
@@ -333,17 +332,17 @@ public class RealtimeSegmentDataManagerTest {
     Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
     Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
     Assert.assertTrue(segmentDataManager._commitSegmentCalled);
-    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
-        RealtimeSegmentDataManager.State.COMMITTED);
+    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), 
RealtimeSegmentDataManager.State.COMMITTED);
     segmentDataManager.destroy();
   }
 
   @Test
-  public void testCommitAfterCatchupWithTimestampOffset() throws Exception {
+  public void testCommitAfterCatchupWithTimestampOffset()
+      throws Exception {
     TableConfig tableConfig = createTableConfig();
-    tableConfig.getIndexingConfig().getStreamConfigs()
-        .put(StreamConfigProperties.constructStreamProperty(
-            StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA, 
"fakeStream"), Instant.now().toString());
+    tableConfig.getIndexingConfig().getStreamConfigs().put(
+        
StreamConfigProperties.constructStreamProperty(StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA,
+            "fakeStream"), Instant.now().toString());
     FakeRealtimeSegmentDataManager segmentDataManager =
         createFakeSegmentManager(false, new TimeSupplier(), null, null, 
tableConfig);
     RealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
@@ -380,8 +379,7 @@ public class RealtimeSegmentDataManagerTest {
     Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
     Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
     Assert.assertTrue(segmentDataManager._commitSegmentCalled);
-    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
-        RealtimeSegmentDataManager.State.COMMITTED);
+    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), 
RealtimeSegmentDataManager.State.COMMITTED);
     segmentDataManager.destroy();
   }
 
@@ -405,8 +403,7 @@ public class RealtimeSegmentDataManagerTest {
     Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
     Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
     Assert.assertFalse(segmentDataManager._commitSegmentCalled);
-    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
-        RealtimeSegmentDataManager.State.DISCARDED);
+    Assert.assertEquals(segmentDataManager._state.get(segmentDataManager), 
RealtimeSegmentDataManager.State.DISCARDED);
     segmentDataManager.destroy();
   }
 
@@ -800,22 +797,13 @@ public class RealtimeSegmentDataManagerTest {
     tableConfig.setUpsertConfig(null);
     ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
     when(propertyStore.get(anyString(), any(), 
anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+    HelixManager helixManager = mock(HelixManager.class);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
 
-    TableDataManagerConfig tableDataManagerConfig = 
mock(TableDataManagerConfig.class);
-    
when(tableDataManagerConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
-    when(tableDataManagerConfig.getTableType()).thenReturn(TableType.REALTIME);
-    
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
-    when(tableDataManagerConfig.getTableConfig()).thenReturn(tableConfig);
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-    
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-    
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
-    TableDataManagerProvider.init(instanceDataManagerConfig);
-
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManager tableDataManager =
-        TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, 
"testInstance", propertyStore,
-            mock(ServerMetrics.class), mock(HelixManager.class), null);
+        new 
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(tableConfig,
 helixManager);
     tableDataManager.start();
     tableDataManager.shutDown();
     Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
@@ -838,8 +826,7 @@ public class RealtimeSegmentDataManagerTest {
       }
     };
     FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager(true, timeSupplier,
-        String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS * 2),
-        segmentTimeThresholdMins + "m", null);
+        String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS * 
2), segmentTimeThresholdMins + "m", null);
     segmentDataManager._stubConsumeLoop = false;
     segmentDataManager._state.set(segmentDataManager, 
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
 
@@ -1081,7 +1068,7 @@ public class RealtimeSegmentDataManagerTest {
       if (!forCommit) {
         return new SegmentBuildDescriptor(null, null, getCurrentOffset(), 0, 
0, -1);
       }
-      File segmentTarFile = new File(SEGMENT_DIR, "segmentFile");
+      File segmentTarFile = new File(new File(TEMP_DIR, REALTIME_TABLE_NAME), 
"segmentFile");
       try {
         segmentTarFile.createNewFile();
       } catch (IOException e) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index 211d2d08ad..bcf75a2cd0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -34,27 +34,26 @@ import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.core.data.manager.TableDataManagerTestUtils;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -77,15 +76,20 @@ public class RealtimeTableDataManagerTest {
   private static final String LONG_COLUMN = "col2";
   private static final long[] LONG_VALUES = {10000L, 20000L, 50000L, 40000L, 
30000L};
 
+  @BeforeClass
+  public void setUp() {
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
   @BeforeMethod
-  public void setUp()
+  public void setUpMethod()
       throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
     TableDataManagerTestUtils.initSegmentFetcher();
   }
 
   @AfterMethod
-  public void tearDown()
+  public void tearDownMethod()
       throws Exception {
     FileUtils.deleteDirectory(TEMP_DIR);
   }
@@ -93,14 +97,14 @@ public class RealtimeTableDataManagerTest {
   @Test
   public void testAddSegmentUseBackupCopy()
       throws Exception {
-    RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
-    TableDataManagerConfig tableDataManagerConfig = 
createTableDataManagerConfig();
-    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    InstanceDataManagerConfig instanceDataManagerConfig = 
createInstanceDataManagerConfig();
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     TableConfig tableConfig = setupTableConfig(propertyStore);
     Schema schema = setupSchema(propertyStore);
-    tmgr.init(tableDataManagerConfig, "server01", propertyStore,
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
-        new TableDataManagerParams(0, false, -1));
+    HelixManager helixManager = mock(HelixManager.class);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
+    tmgr.init(instanceDataManagerConfig, tableConfig, helixManager, null, 
null);
 
     // Create a dummy local segment.
     String segName = "seg01";
@@ -128,14 +132,14 @@ public class RealtimeTableDataManagerTest {
   @Test
   public void testAddSegmentNoBackupCopy()
       throws Exception {
-    RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
-    TableDataManagerConfig tableDataManagerConfig = 
createTableDataManagerConfig();
-    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    InstanceDataManagerConfig instanceDataManagerConfig = 
createInstanceDataManagerConfig();
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     TableConfig tableConfig = setupTableConfig(propertyStore);
     Schema schema = setupSchema(propertyStore);
-    tmgr.init(tableDataManagerConfig, "server01", propertyStore,
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
-        new TableDataManagerParams(0, false, -1));
+    HelixManager helixManager = mock(HelixManager.class);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
+    tmgr.init(instanceDataManagerConfig, tableConfig, helixManager, null, 
null);
 
     // Create a raw segment and put it in deep store backed by local fs.
     String segName = "seg01";
@@ -159,14 +163,14 @@ public class RealtimeTableDataManagerTest {
   @Test
   public void testAddSegmentDefaultTierByTierBasedDirLoader()
       throws Exception {
-    RealtimeTableDataManager tmgr1 = new RealtimeTableDataManager(null);
-    TableDataManagerConfig tableDataManagerConfig = 
createTableDataManagerConfig();
-    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    InstanceDataManagerConfig instanceDataManagerConfig = 
createInstanceDataManagerConfig();
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     TableConfig tableConfig = setupTableConfig(propertyStore);
     Schema schema = setupSchema(propertyStore);
-    tmgr1.init(tableDataManagerConfig, "server01", propertyStore,
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
-        new TableDataManagerParams(0, false, -1));
+    HelixManager helixManager = mock(HelixManager.class);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    RealtimeTableDataManager tmgr1 = new RealtimeTableDataManager(null);
+    tmgr1.init(instanceDataManagerConfig, tableConfig, helixManager, null, 
null);
 
     // Create a raw segment and put it in deep store backed by local fs.
     String segName = "seg_tiered_01";
@@ -190,17 +194,10 @@ public class RealtimeTableDataManagerTest {
     // Now, repeat initialization of the table data manager
     tmgr1.shutDown();
     RealtimeTableDataManager tmgr2 = new RealtimeTableDataManager(null);
-    tableDataManagerConfig = createTableDataManagerConfig();
-    propertyStore = mock(ZkHelixPropertyStore.class);
-    tableConfig = setupTableConfig(propertyStore);
-    schema = setupSchema(propertyStore);
-    tmgr2.init(tableDataManagerConfig, "server01", propertyStore,
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, null,
-        new TableDataManagerParams(0, false, -1));
+    tmgr2.init(instanceDataManagerConfig, tableConfig, helixManager, null, 
null);
 
     // Reinitialize index loading config and try adding the segment
-    indexLoadingConfig =
-        TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", 
tableConfig, schema);
+    indexLoadingConfig = 
TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig, 
schema);
     tmgr2.addSegment(segName, indexLoadingConfig, segmentZKMetadata);
 
     // Make sure that the segment hasn't been moved
@@ -245,24 +242,22 @@ public class RealtimeTableDataManagerTest {
     return new File(TABLE_DATA_DIR, segName);
   }
 
-  private static TableDataManagerConfig createTableDataManagerConfig() {
-    TableDataManagerConfig tableDataManagerConfig = 
mock(TableDataManagerConfig.class);
-    
when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME_WITH_TYPE);
-    
when(tableDataManagerConfig.getDataDir()).thenReturn(TABLE_DATA_DIR.getAbsolutePath());
-    return tableDataManagerConfig;
+  private static InstanceDataManagerConfig createInstanceDataManagerConfig() {
+    InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
+    return instanceDataManagerConfig;
   }
 
-  private static TableConfig setupTableConfig(ZkHelixPropertyStore 
propertyStore)
+  private static TableConfig setupTableConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore)
       throws Exception {
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build();
     ZNRecord tableConfigZNRecord = TableConfigUtils.toZNRecord(tableConfig);
     
when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForResourceConfig(TABLE_NAME_WITH_TYPE),
 null,
         AccessOption.PERSISTENT)).thenReturn(tableConfigZNRecord);
     return tableConfig;
   }
 
-  private static Schema setupSchema(ZkHelixPropertyStore propertyStore) {
+  private static Schema setupSchema(ZkHelixPropertyStore<ZNRecord> 
propertyStore) {
     Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
         .addMetric(LONG_COLUMN, FieldSpec.DataType.LONG).build();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index 27d4893403..49697b6b85 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -30,7 +30,6 @@ import 
org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
@@ -39,7 +38,6 @@ import 
org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -55,7 +53,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -86,12 +83,13 @@ public class QueryExecutorExceptionsTest {
   private final List<ImmutableSegment> _indexSegments = new 
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
   private final List<String> _segmentNames = new 
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
 
-  private ServerMetrics _serverMetrics;
   private QueryExecutor _queryExecutor;
 
   @BeforeClass
   public void setUp()
       throws Exception {
+    ServerMetrics.register(mock(ServerMetrics.class));
+
     // Set up the segments
     FileUtils.deleteQuietly(INDEX_DIR);
     assertTrue(INDEX_DIR.mkdirs());
@@ -132,21 +130,11 @@ public class QueryExecutorExceptionsTest {
     }
 
     // Mock the instance data manager
-    _serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
-    TableDataManagerConfig tableDataManagerConfig = 
mock(TableDataManagerConfig.class);
-    when(tableDataManagerConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
-    when(tableDataManagerConfig.getTableType()).thenReturn(TableType.OFFLINE);
-    
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-    
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-    
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
-    TableDataManagerProvider.init(instanceDataManagerConfig);
-    @SuppressWarnings("unchecked")
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
     TableDataManager tableDataManager =
-        TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, 
"testInstance",
-            mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class), 
mock(HelixManager.class), null);
+        new 
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(tableConfig,
+            mock(HelixManager.class));
     tableDataManager.start();
     //we don't add index segments to the data manager to simulate 
numSegmentsAcquired < numSegmentsQueried
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
@@ -158,7 +146,7 @@ public class QueryExecutorExceptionsTest {
     PropertiesConfiguration queryExecutorConfig =
         CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
     _queryExecutor = new ServerQueryExecutorV1Impl();
-    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
instanceDataManager, _serverMetrics);
+    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
instanceDataManager, ServerMetrics.get());
   }
 
   /**
@@ -190,6 +178,6 @@ public class QueryExecutorExceptionsTest {
   }
 
   private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
-    return new ServerQueryRequest(instanceRequest, _serverMetrics, 
System.currentTimeMillis());
+    return new ServerQueryRequest(instanceRequest, ServerMetrics.get(), 
System.currentTimeMillis());
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index d68c6ab6f4..c600d8b5cc 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Executors;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -36,7 +35,6 @@ import 
org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -52,7 +50,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -72,7 +69,7 @@ public class QueryExecutorTest {
   private static final String AVRO_DATA_PATH = "data/simpleData200001.avro";
   private static final String EMPTY_JSON_DATA_PATH = 
"data/test_empty_data.json";
   private static final String QUERY_EXECUTOR_CONFIG_PATH = 
"conf/query-executor.properties";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"QueryExecutorTest");
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"QueryExecutorTest");
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
   private static final int NUM_SEGMENTS_TO_GENERATE = 2;
@@ -82,25 +79,27 @@ public class QueryExecutorTest {
   private final List<ImmutableSegment> _indexSegments = new 
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
   private final List<String> _segmentNames = new 
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
 
-  private ServerMetrics _serverMetrics;
   private QueryExecutor _queryExecutor;
 
   @BeforeClass
   public void setUp()
       throws Exception {
+    ServerMetrics.register(mock(ServerMetrics.class));
+
     // Set up the segments
-    FileUtils.deleteQuietly(INDEX_DIR);
-    assertTrue(INDEX_DIR.mkdirs());
+    FileUtils.deleteQuietly(TEMP_DIR);
+    assertTrue(TEMP_DIR.mkdirs());
     URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
     Assert.assertNotNull(resourceUrl);
     File avroFile = new File(resourceUrl.getFile());
     Schema schema = 
SegmentTestUtils.extractSchemaFromAvroWithoutTime(avroFile);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);
     int i = 0;
     for (; i < NUM_SEGMENTS_TO_GENERATE; i++) {
       SegmentGeneratorConfig config =
-          SegmentTestUtils.getSegmentGeneratorConfig(avroFile, 
FileFormat.AVRO, INDEX_DIR, RAW_TABLE_NAME, tableConfig,
-              schema);
+          SegmentTestUtils.getSegmentGeneratorConfig(avroFile, 
FileFormat.AVRO, tableDataDir, RAW_TABLE_NAME,
+              tableConfig, schema);
       config.setSegmentNamePostfix(Integer.toString(i));
       SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
       driver.init(config);
@@ -110,7 +109,7 @@ public class QueryExecutorTest {
       
Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
       
Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
       
Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
-      _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, 
driver.getSegmentName()), ReadMode.mmap));
+      _indexSegments.add(ImmutableSegmentLoader.load(new File(tableDataDir, 
driver.getSegmentName()), ReadMode.mmap));
       _segmentNames.add(driver.getSegmentName());
     }
     resourceUrl = 
getClass().getClassLoader().getResource(EMPTY_JSON_DATA_PATH);
@@ -118,32 +117,22 @@ public class QueryExecutorTest {
     File jsonFile = new File(resourceUrl.getFile());
     for (; i < NUM_SEGMENTS_TO_GENERATE + NUM_EMPTY_SEGMENTS_TO_GENERATE; i++) 
{
       SegmentGeneratorConfig config =
-          SegmentTestUtils.getSegmentGeneratorConfig(jsonFile, 
FileFormat.JSON, INDEX_DIR, RAW_TABLE_NAME, tableConfig,
-              schema);
+          SegmentTestUtils.getSegmentGeneratorConfig(jsonFile, 
FileFormat.JSON, tableDataDir, RAW_TABLE_NAME,
+              tableConfig, schema);
       config.setSegmentNamePostfix(Integer.toString(i));
       SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
       driver.init(config);
       driver.build();
-      _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, 
driver.getSegmentName()), ReadMode.mmap));
+      _indexSegments.add(ImmutableSegmentLoader.load(new File(tableDataDir, 
driver.getSegmentName()), ReadMode.mmap));
       _segmentNames.add(driver.getSegmentName());
     }
 
     // Mock the instance data manager
-    _serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
-    TableDataManagerConfig tableDataManagerConfig = 
mock(TableDataManagerConfig.class);
-    when(tableDataManagerConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
-    when(tableDataManagerConfig.getTableType()).thenReturn(TableType.OFFLINE);
-    
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-    
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-    
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
-    TableDataManagerProvider.init(instanceDataManagerConfig);
-    @SuppressWarnings("unchecked")
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManager tableDataManager =
-        TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, 
"testInstance",
-            mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class), 
mock(HelixManager.class), null);
+        new 
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(tableConfig,
+            mock(HelixManager.class));
     tableDataManager.start();
     for (ImmutableSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment(indexSegment);
@@ -154,10 +143,9 @@ public class QueryExecutorTest {
     // Set up the query executor
     resourceUrl = 
getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
     Assert.assertNotNull(resourceUrl);
-    PropertiesConfiguration queryExecutorConfig =
-        CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
+    PropertiesConfiguration queryExecutorConfig = 
CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
     _queryExecutor = new ServerQueryExecutorV1Impl();
-    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
instanceDataManager, _serverMetrics);
+    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
instanceDataManager, ServerMetrics.get());
   }
 
   @Test
@@ -205,10 +193,10 @@ public class QueryExecutorTest {
     for (IndexSegment segment : _indexSegments) {
       segment.destroy();
     }
-    FileUtils.deleteQuietly(INDEX_DIR);
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 
   private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
-    return new ServerQueryRequest(instanceRequest, _serverMetrics, 
System.currentTimeMillis());
+    return new ServerQueryRequest(instanceRequest, ServerMetrics.get(), 
System.currentTimeMillis());
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index a5c2cc3f5c..17c543c0f7 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -52,7 +51,6 @@ import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -69,7 +67,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -85,7 +82,7 @@ import static org.mockito.Mockito.when;
 
 
 public class ExplainPlanQueriesTest extends BaseQueriesTest {
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"ExplainPlanQueriesTest");
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"ExplainPlanQueriesTest");
   private static final String QUERY_EXECUTOR_CONFIG_PATH = 
"conf/query-executor.properties";
   private static final ExecutorService QUERY_RUNNERS = 
Executors.newFixedThreadPool(20);
 
@@ -144,7 +141,6 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
   private List<IndexSegment> _indexSegments;
   private List<String> _segmentNames;
 
-  private ServerMetrics _serverMetrics;
   private QueryExecutor _queryExecutor;
   private QueryExecutor _queryExecutorWithPrefetchEnabled;
   private BrokerReduceService _brokerReduceService;
@@ -214,9 +210,10 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
 
     List<String> textIndexColumns = Arrays.asList(COL1_TEXT_INDEX);
 
+    File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);
     SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
     segmentGeneratorConfig.setSegmentName(segmentName);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+    segmentGeneratorConfig.setOutDir(tableDataDir.getPath());
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
@@ -232,13 +229,15 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
 
     _segmentNames.add(segmentName);
 
-    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
indexLoadingConfig);
+    return ImmutableSegmentLoader.load(new File(tableDataDir, segmentName), 
indexLoadingConfig);
   }
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    FileUtils.deleteDirectory(INDEX_DIR);
+    ServerMetrics.register(mock(ServerMetrics.class));
+
+    FileUtils.deleteDirectory(TEMP_DIR);
     _segmentNames = new ArrayList<>();
 
     List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
@@ -290,21 +289,11 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2, 
immutableSegment3, immutableSegment4);
 
     // Mock the instance data manager
-    _serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
-    TableDataManagerConfig tableDataManagerConfig = 
mock(TableDataManagerConfig.class);
-    when(tableDataManagerConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
-    when(tableDataManagerConfig.getTableType()).thenReturn(TableType.OFFLINE);
-    
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-    
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-    
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
-    TableDataManagerProvider.init(instanceDataManagerConfig);
-    @SuppressWarnings("unchecked")
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManager tableDataManager =
-        TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, 
"testInstance",
-            mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class), 
mock(HelixManager.class), null);
+        new 
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(TABLE_CONFIG,
+            mock(HelixManager.class));
     tableDataManager.start();
     for (IndexSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment((ImmutableSegment) indexSegment);
@@ -318,12 +307,12 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     PropertiesConfiguration queryExecutorConfig =
         CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
     _queryExecutor = new ServerQueryExecutorV1Impl();
-    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
instanceDataManager, _serverMetrics);
+    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
instanceDataManager, ServerMetrics.get());
 
     PinotConfiguration prefetchEnabledConf = new 
PinotConfiguration(queryExecutorConfig);
     prefetchEnabledConf.setProperty(ServerQueryExecutorV1Impl.ENABLE_PREFETCH, 
"true");
     _queryExecutorWithPrefetchEnabled = new ServerQueryExecutorV1Impl();
-    _queryExecutorWithPrefetchEnabled.init(prefetchEnabledConf, 
instanceDataManager, _serverMetrics);
+    _queryExecutorWithPrefetchEnabled.init(prefetchEnabledConf, 
instanceDataManager, ServerMetrics.get());
 
     // Create the BrokerReduceService
     _brokerReduceService = new BrokerReduceService(new PinotConfiguration(
@@ -420,7 +409,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
   }
 
   private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
-    return new ServerQueryRequest(instanceRequest, _serverMetrics, 
System.currentTimeMillis());
+    return new ServerQueryRequest(instanceRequest, ServerMetrics.get(), 
System.currentTimeMillis());
   }
 
   @Test
@@ -2547,6 +2536,6 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     for (IndexSegment segment : _indexSegments) {
       segment.destroy();
     }
-    FileUtils.deleteQuietly(INDEX_DIR);
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index ddf46340be..ac9b7e82a9 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -31,10 +31,8 @@ import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.commons.configuration2.PropertiesConfiguration;
-import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -45,7 +43,6 @@ import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -63,12 +60,10 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
-import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -85,11 +80,9 @@ import static org.testng.Assert.assertTrue;
  * Class for testing segment generation with byte[] data type.
  */
 public class SegmentWithNullValueVectorTest {
-  private static final int NUM_ROWS = 10001;
-
-  private static final String SEGMENT_DIR_NAME =
-      System.getProperty("java.io.tmpdir") + File.separator + 
"nullValueVectorTest";
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"SegmentWithNullValueVectorTest");
   private static final String SEGMENT_NAME = "testSegment";
+  private static final int NUM_ROWS = 10001;
   private static final long LONG_VALUE_THRESHOLD = 100;
 
   private Random _random;
@@ -107,7 +100,6 @@ public class SegmentWithNullValueVectorTest {
   // Required for subsequent queries
   private final List<String> _segmentNames = new ArrayList<>();
   private InstanceDataManager _instanceDataManager;
-  private ServerMetrics _serverMetrics;
   private QueryExecutor _queryExecutor;
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
@@ -124,6 +116,7 @@ public class SegmentWithNullValueVectorTest {
   @BeforeClass
   public void setup()
       throws Exception {
+    ServerMetrics.register(mock(ServerMetrics.class));
 
     _schema = new Schema();
     _schema.addField(new DimensionFieldSpec(INT_COLUMN, 
FieldSpec.DataType.INT, true));
@@ -136,44 +129,28 @@ public class SegmentWithNullValueVectorTest {
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNullHandlingEnabled(true).build();
     _random = new Random(System.nanoTime());
     buildIndex(tableConfig, _schema);
-    _segment = ImmutableSegmentLoader.load(new File(SEGMENT_DIR_NAME, 
SEGMENT_NAME), ReadMode.heap);
 
-    setupQueryServer();
-  }
-
-  // Registers the segment and initializes Query Executor
-  private void setupQueryServer()
-      throws ConfigurationException {
+    _segment =
+        ImmutableSegmentLoader.load(new File(new File(TEMP_DIR, 
OFFLINE_TABLE_NAME), SEGMENT_NAME), ReadMode.heap);
     _segmentNames.add(_segment.getSegmentName());
+
     // Mock the instance data manager
-    _serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
-    TableDataManagerConfig tableDataManagerConfig = 
Mockito.mock(TableDataManagerConfig.class);
-    
Mockito.when(tableDataManagerConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
-    
Mockito.when(tableDataManagerConfig.getTableType()).thenReturn(TableType.OFFLINE);
-    
Mockito.when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-    
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-    
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-    
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
-    TableDataManagerProvider.init(instanceDataManagerConfig);
-    @SuppressWarnings("unchecked")
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManager tableDataManager =
-        TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, 
"testInstance",
-            Mockito.mock(ZkHelixPropertyStore.class), 
Mockito.mock(ServerMetrics.class),
-            Mockito.mock(HelixManager.class), null);
+        new 
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(tableConfig,
+            mock(HelixManager.class));
     tableDataManager.start();
     tableDataManager.addSegment(_segment);
-    _instanceDataManager = Mockito.mock(InstanceDataManager.class);
-    
Mockito.when(_instanceDataManager.getTableDataManager(OFFLINE_TABLE_NAME)).thenReturn(tableDataManager);
+    _instanceDataManager = mock(InstanceDataManager.class);
+    
when(_instanceDataManager.getTableDataManager(OFFLINE_TABLE_NAME)).thenReturn(tableDataManager);
 
     // Set up the query executor
     URL resourceUrl = 
getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
     Assert.assertNotNull(resourceUrl);
-    PropertiesConfiguration queryExecutorConfig =
-        CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
+    PropertiesConfiguration queryExecutorConfig = 
CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
     _queryExecutor = new ServerQueryExecutorV1Impl();
-    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
_instanceDataManager, _serverMetrics);
+    _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
_instanceDataManager, ServerMetrics.get());
   }
 
   /**
@@ -186,9 +163,8 @@ public class SegmentWithNullValueVectorTest {
   private void buildIndex(TableConfig tableConfig, Schema schema)
       throws Exception {
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
-
-    config.setOutDir(SEGMENT_DIR_NAME);
     config.setSegmentName(SEGMENT_NAME);
+    config.setOutDir(new File(TEMP_DIR, OFFLINE_TABLE_NAME).getAbsolutePath());
 
     List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
@@ -295,7 +271,7 @@ public class SegmentWithNullValueVectorTest {
   }
 
   private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
-    return new ServerQueryRequest(instanceRequest, _serverMetrics, 
System.currentTimeMillis());
+    return new ServerQueryRequest(instanceRequest, ServerMetrics.get(), 
System.currentTimeMillis());
   }
 
   /**
@@ -305,6 +281,6 @@ public class SegmentWithNullValueVectorTest {
   public void cleanup()
       throws IOException {
     _segment.destroy();
-    FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME));
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 634390effc..43017f29bd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -389,7 +389,7 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
       throws Exception {
     PinotConfiguration config = getServerConf(12345);
     
config.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
-            HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+            HelixInstanceDataManagerConfig.UPSERT_CONFIG_PREFIX,
             
TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_METADATA_MANAGER_CLASS),
         DummyTableUpsertMetadataManager.class.getName());
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index f10a58bf72..9123c9660c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -106,10 +106,10 @@ public class UpsertTableSegmentPreloadIntegrationTest 
extends BaseClusterIntegra
     
serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX
 + ".max.segment.preload.threads",
         "1");
     
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
-        HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+        HelixInstanceDataManagerConfig.UPSERT_CONFIG_PREFIX,
         TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_SNAPSHOT), 
"true");
     
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
-        HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+        HelixInstanceDataManagerConfig.UPSERT_CONFIG_PREFIX,
         TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_PRELOAD), 
"true");
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 0b820f50c2..0396b925ce 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -27,14 +27,12 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
@@ -48,11 +46,19 @@ public interface TableDataManager {
   /**
    * Initializes the table data manager. Should be called only once and before 
calling any other method.
    */
-  void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
-      ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics 
serverMetrics, HelixManager helixManager,
+  void init(InstanceDataManagerConfig instanceDataManagerConfig, TableConfig 
tableConfig, HelixManager helixManager,
       @Nullable ExecutorService segmentPreloadExecutor,
-      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> 
errorCache,
-      TableDataManagerParams tableDataManagerParams);
+      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> 
errorCache);
+
+  /**
+   * Returns the instance id of the server.
+   */
+  String getInstanceId();
+
+  /**
+   * Returns the config for the instance data manager.
+   */
+  InstanceDataManagerConfig getInstanceDataManagerConfig();
 
   /**
    * Starts the table data manager. Should be called only once after table 
data manager gets initialized but before
@@ -207,11 +213,6 @@ public interface TableDataManager {
    */
   File getTableDataDir();
 
-  /**
-   * Returns the config for the table data manager.
-   */
-  TableDataManagerConfig getTableDataManagerConfig();
-
   /**
    * Add error related to segment, if any. The implementation
    * is expected to cache last 'N' errors for the table, related to
@@ -233,7 +234,7 @@ public interface TableDataManager {
    * @param segmentNameStr name of segment for which the state change is being 
handled
    */
   default void onConsumingToDropped(String segmentNameStr) {
-  };
+  }
 
   /**
    * Interface to handle segment state transitions from CONSUMING to ONLINE
@@ -241,5 +242,5 @@ public interface TableDataManager {
    * @param segmentNameStr name of segment for which the state change is being 
handled
    */
   default void onConsumingToOnline(String segmentNameStr) {
-  };
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
deleted file mode 100644
index b17c24e564..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.data.manager;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.configuration2.Configuration;
-import org.apache.commons.configuration2.PropertiesConfiguration;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.env.PinotConfiguration;
-
-
-/**
- * The config used for TableDataManager.
- */
-public class TableDataManagerConfig {
-  public static final String AUTH_CONFIG_PREFIX = "auth";
-  public static final String TIER_CONFIGS_PREFIX = "tierConfigs";
-  public static final String TIER_NAMES = "tierNames";
-
-  private final InstanceDataManagerConfig _instanceDataManagerConfig;
-  private final TableConfig _tableConfig;
-
-  public TableDataManagerConfig(InstanceDataManagerConfig 
instanceDataManagerConfig, TableConfig tableConfig) {
-    _instanceDataManagerConfig = instanceDataManagerConfig;
-    _tableConfig = tableConfig;
-  }
-
-  public InstanceDataManagerConfig getInstanceDataManagerConfig() {
-    return _instanceDataManagerConfig;
-  }
-
-  public TableConfig getTableConfig() {
-    return _tableConfig;
-  }
-
-  public String getTableName() {
-    return _tableConfig.getTableName();
-  }
-
-  public TableType getTableType() {
-    return _tableConfig.getTableType();
-  }
-
-  public boolean isDimTable() {
-    return _tableConfig.isDimTable();
-  }
-
-  public String getDataDir() {
-    return _instanceDataManagerConfig.getInstanceDataDir() + "/" + 
getTableName();
-  }
-
-  public String getConsumerDir() {
-    return _instanceDataManagerConfig.getConsumerDir();
-  }
-
-  public String getTablePeerDownloadScheme() {
-    String peerSegmentDownloadScheme = 
_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
-    if (peerSegmentDownloadScheme != null) {
-      return peerSegmentDownloadScheme;
-    }
-    return _instanceDataManagerConfig.getSegmentPeerDownloadScheme();
-  }
-
-  public int getTableDeletedSegmentsCacheSize() {
-    return _instanceDataManagerConfig.getDeletedSegmentsCacheSize();
-  }
-
-  public int getTableDeletedSegmentsCacheTtlMinutes() {
-    return _instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes();
-  }
-
-  public Configuration getAuthConfig() {
-    Configuration authConfig = new PropertiesConfiguration();
-    
_instanceDataManagerConfig.getConfig().subset(AUTH_CONFIG_PREFIX).toMap().forEach(authConfig::addProperty);
-    return authConfig;
-  }
-
-  public Map<String, Map<String, String>> getInstanceTierConfigs() {
-    PinotConfiguration tierConfigs = 
_instanceDataManagerConfig.getConfig().subset(TIER_CONFIGS_PREFIX);
-    List<String> tierNames = tierConfigs.getProperty(TIER_NAMES, 
Collections.emptyList());
-    if (tierNames.isEmpty()) {
-      return Collections.emptyMap();
-    }
-    Map<String, Map<String, String>> instanceTierConfigs = new HashMap<>();
-    for (String tierName : tierNames) {
-      Map<String, String> tierConfigMap = new HashMap<>();
-      tierConfigs.subset(tierName).toMap().forEach((k, v) -> 
tierConfigMap.put(k, String.valueOf(v)));
-      instanceTierConfigs.put(tierName, tierConfigMap);
-    }
-    return instanceTierConfigs;
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
deleted file mode 100644
index f3a0589dd2..0000000000
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.data.manager;
-
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
-
-
-public class TableDataManagerParams {
-  private boolean _isStreamSegmentDownloadUntar; // whether to turn on stream 
segment download-untar
-  private long _streamSegmentDownloadUntarRateLimitBytesPerSec; // the per 
segment rate limit for stream download-untar
-  private int _maxParallelSegmentDownloads; // max number of segment download 
in parallel per table
-
-  public TableDataManagerParams(int maxParallelSegmentDownloads, boolean 
isStreamSegmentDownloadUntar,
-      long streamSegmentDownloadUntarRateLimitBytesPerSec) {
-    _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
-    _isStreamSegmentDownloadUntar = isStreamSegmentDownloadUntar;
-    _streamSegmentDownloadUntarRateLimitBytesPerSec = 
streamSegmentDownloadUntarRateLimitBytesPerSec;
-  }
-
-  public TableDataManagerParams(InstanceDataManagerConfig 
instanceDataManagerConfig) {
-    _maxParallelSegmentDownloads = 
instanceDataManagerConfig.getMaxParallelSegmentDownloads();
-    _isStreamSegmentDownloadUntar = 
instanceDataManagerConfig.isStreamSegmentDownloadUntar();
-    _streamSegmentDownloadUntarRateLimitBytesPerSec =
-        instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
-  }
-
-  public boolean isStreamSegmentDownloadUntar() {
-    return _isStreamSegmentDownloadUntar;
-  }
-
-  public long getStreamSegmentDownloadUntarRateLimitBytesPerSec() {
-    return _streamSegmentDownloadUntarRateLimitBytesPerSec;
-  }
-
-  public void setStreamSegmentDownloadUntar(boolean 
streamSegmentDownloadUntar) {
-    _isStreamSegmentDownloadUntar = streamSegmentDownloadUntar;
-  }
-
-  public void setStreamSegmentDownloadUntarRateLimitBytesPerSec(long 
streamSegmentDownloadUntarRateLimitBytesPerSec) {
-    _streamSegmentDownloadUntarRateLimitBytesPerSec = 
streamSegmentDownloadUntarRateLimitBytesPerSec;
-  }
-
-  public int getMaxParallelSegmentDownloads() {
-    return _maxParallelSegmentDownloads;
-  }
-
-  public void setMaxParallelSegmentDownloads(int maxParallelSegmentDownloads) {
-    _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
-  }
-}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 72f57d4086..71c8c37317 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -41,7 +41,6 @@ import 
org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -214,15 +213,13 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
 
   @VisibleForTesting
   String getInstanceId() {
-    InstanceDataManagerConfig instanceDataManagerConfig =
-        
_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig();
-    return instanceDataManagerConfig.getInstanceId();
+    return _tableDataManager.getInstanceDataManagerConfig().getInstanceId();
   }
 
   @VisibleForTesting
   IndexLoadingConfig createIndexLoadingConfig() {
-    return new 
IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(),
-        _context.getTableConfig(), _context.getSchema());
+    return new 
IndexLoadingConfig(_tableDataManager.getInstanceDataManagerConfig(), 
_context.getTableConfig(),
+        _context.getSchema());
   }
 
   @VisibleForTesting
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 1ee4a834e1..f3a3dff5ab 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -38,7 +38,7 @@ public class TableUpsertMetadataManagerFactory {
   public static final String UPSERT_DEFAULT_ENABLE_PRELOAD = 
"default.enable.preload";
 
   public static TableUpsertMetadataManager create(TableConfig tableConfig,
-      @Nullable PinotConfiguration instanceUpsertConfigs) {
+      @Nullable PinotConfiguration instanceUpsertConfig) {
     String tableNameWithType = tableConfig.getTableName();
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     Preconditions.checkArgument(upsertConfig != null, "Must provide upsert 
config for table: %s", tableNameWithType);
@@ -46,20 +46,20 @@ public class TableUpsertMetadataManagerFactory {
     TableUpsertMetadataManager metadataManager;
     String metadataManagerClass = upsertConfig.getMetadataManagerClass();
 
-    if (instanceUpsertConfigs != null) {
+    if (instanceUpsertConfig != null) {
       if (metadataManagerClass == null) {
-        metadataManagerClass = 
instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_METADATA_MANAGER_CLASS);
+        metadataManagerClass = 
instanceUpsertConfig.getProperty(UPSERT_DEFAULT_METADATA_MANAGER_CLASS);
       }
       // Server level config honoured only when table level config is not set 
to true
       if (!upsertConfig.isEnableSnapshot()) {
         upsertConfig.setEnableSnapshot(
-            
Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_SNAPSHOT,
 "false")));
+            
Boolean.parseBoolean(instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ENABLE_SNAPSHOT,
 "false")));
       }
 
       // Server level config honoured only when table level config is not set 
to true
       if (!upsertConfig.isEnablePreload()) {
         upsertConfig.setEnablePreload(
-            
Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD,
 "false")));
+            
Boolean.parseBoolean(instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD,
 "false")));
       }
     }
 
@@ -67,8 +67,7 @@ public class TableUpsertMetadataManagerFactory {
       LOGGER.info("Creating TableUpsertMetadataManager with class: {} for 
table: {}", metadataManagerClass,
           tableNameWithType);
       try {
-        metadataManager =
-            (TableUpsertMetadataManager) 
Class.forName(metadataManagerClass).newInstance();
+        metadataManager = (TableUpsertMetadataManager) 
Class.forName(metadataManagerClass).newInstance();
       } catch (Exception e) {
         throw new RuntimeException(
             String.format("Caught exception while constructing 
TableUpsertMetadataManager with class: %s for table: %s",
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 0d194c51ae..8e147cc74a 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -60,7 +60,6 @@ import 
org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.util.SegmentRefreshSemaphore;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -87,8 +86,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
 
   private HelixInstanceDataManagerConfig _instanceDataManagerConfig;
   private String _instanceId;
+  private TableDataManagerProvider _tableDataManagerProvider;
   private HelixManager _helixManager;
-  private ServerMetrics _serverMetrics;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private SegmentUploader _segmentUploader;
   private Supplier<Boolean> _isServerReadyToServeQueries = () -> false;
@@ -114,10 +113,10 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     _instanceDataManagerConfig = new HelixInstanceDataManagerConfig(config);
     LOGGER.info("HelixInstanceDataManagerConfig: {}", 
_instanceDataManagerConfig);
     _instanceId = _instanceDataManagerConfig.getInstanceId();
+    _tableDataManagerProvider = new 
TableDataManagerProvider(_instanceDataManagerConfig);
     _helixManager = helixManager;
-    _serverMetrics = serverMetrics;
     _segmentUploader = new 
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
-        
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
_serverMetrics);
+        
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
serverMetrics);
 
     _externalViewDroppedMaxWaitMs = 
_instanceDataManagerConfig.getExternalViewDroppedMaxWaitMs();
     _externalViewDroppedCheckInternalMs = 
_instanceDataManagerConfig.getExternalViewDroppedCheckIntervalMs();
@@ -148,8 +147,6 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     } else {
       LOGGER.info("SegmentPreloadExecutor was not created with pool size: {}", 
poolSize);
     }
-    // Initialize the table data manager provider
-    TableDataManagerProvider.init(_instanceDataManagerConfig);
     LOGGER.info("Initialized Helix instance data manager");
 
     // Initialize the error cache
@@ -225,17 +222,17 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
         ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
realtimeTableName, segmentName);
     Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata 
for segment: %s, table: %s", segmentName,
         realtimeTableName);
-    _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> 
createTableDataManager(k, tableConfig))
+    _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> 
createTableDataManager(tableConfig))
         .addSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), 
zkMetadata);
     LOGGER.info("Added segment: {} to table: {}", segmentName, 
realtimeTableName);
   }
 
-  private TableDataManager createTableDataManager(String tableNameWithType, 
TableConfig tableConfig) {
+  private TableDataManager createTableDataManager(TableConfig tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
     LOGGER.info("Creating table data manager for table: {}", 
tableNameWithType);
-    TableDataManagerConfig tableDataManagerConfig = new 
TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
     TableDataManager tableDataManager =
-        TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, 
_instanceId, _propertyStore,
-            _serverMetrics, _helixManager, _segmentPreloadExecutor, 
_errorCache, _isServerReadyToServeQueries);
+        _tableDataManagerProvider.getTableDataManager(tableConfig, 
_helixManager, _segmentPreloadExecutor, _errorCache,
+            _isServerReadyToServeQueries);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;
@@ -501,7 +498,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
       // is set to null. Then, addOrReplaceSegment method will load the 
segment accordingly.
       SegmentMetadata localMetadata = getSegmentMetadata(tableNameWithType, 
segmentName);
 
-      _tableDataManagerMap.computeIfAbsent(tableNameWithType, k -> 
createTableDataManager(k, tableConfig))
+      _tableDataManagerMap.computeIfAbsent(tableNameWithType, k -> 
createTableDataManager(tableConfig))
           .addOrReplaceSegment(segmentName, new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema),
               zkMetadata, localMetadata);
       LOGGER.info("Added or replaced segment: {} of table: {}", segmentName, 
tableNameWithType);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index c7e15fe106..1b5940ff41 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -18,6 +18,11 @@
  */
 package org.apache.pinot.server.starter.helix;
 
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -37,8 +42,6 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_READ_MOD
 
 /**
  * The config used for HelixInstanceDataManager.
- *
- *
  */
 public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixInstanceDataManagerConfig.class);
@@ -56,10 +59,6 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir";
   // Key of segment directory
   public static final String INSTANCE_BOOTSTRAP_SEGMENT_DIR = 
"bootstrap.segment.dir";
-  // Key of table data directory
-  public static final String kEY_OF_TABLE_DATA_DIRECTORY = "directory";
-  // Key of table data directory
-  public static final String kEY_OF_TABLE_NAME = "name";
   // Key of instance level segment read mode
   public static final String READ_MODE = "readMode";
   // Key of the segment format this server can read
@@ -68,6 +67,14 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT = 
"reload.consumingSegment";
   // Key of segment directory loader
   public static final String SEGMENT_DIRECTORY_LOADER = 
"segment.directory.loader";
+  // Prefix for upsert config
+  public static final String UPSERT_CONFIG_PREFIX = "upsert";
+  // Prefix for auth config
+  public static final String AUTH_CONFIG_PREFIX = "auth";
+  // Prefix for tier configs
+  public static final String TIER_CONFIGS_PREFIX = "tierConfigs";
+  // Key of tier names
+  public static final String TIER_NAMES = "tierNames";
 
   // Key of how many parallel realtime segments can be built.
   // A value of <= 0 indicates unlimited.
@@ -86,10 +93,10 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   // Key of server segment download rate limit
   // limit the rate to write download-untar stream to disk, in bytes
   // -1 for no disk write limit, 0 for limit the writing to min(untar, 
download) rate
-  private static final String STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT
-      = "segment.stream.download.untar.rate.limit.bytes.per.sec";
-  private static final long DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT
-      = TarGzCompressionUtils.NO_DISK_WRITE_RATE_LIMIT;
+  private static final String STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT =
+      "segment.stream.download.untar.rate.limit.bytes.per.sec";
+  private static final long DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT =
+      TarGzCompressionUtils.NO_DISK_WRITE_RATE_LIMIT;
 
   // Key of whether to use streamed server segment download-untar
   private static final String ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = 
"segment.stream.download.untar";
@@ -131,8 +138,6 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 
"external.view.dropped.max.wait.ms";
   private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS = 
"external.view.dropped.check.interval.ms";
 
-  public static final String PREFIX_OF_CONFIG_OF_UPSERT = "upsert";
-
   private final static String[] REQUIRED_KEYS = {INSTANCE_ID};
   private static final long DEFAULT_ERROR_CACHE_SIZE = 100L;
   private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000;
@@ -140,176 +145,187 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   public static final long DEFAULT_EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 20 * 
60_000L;
   public static final long DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS = 
1_000L;
 
-  private final PinotConfiguration _instanceDataManagerConfiguration;
+  private final PinotConfiguration _serverConfig;
+  private final PinotConfiguration _upsertConfig;
+  private final PinotConfiguration _authConfig;
+  private final Map<String, Map<String, String>> _tierConfigs;
 
   public HelixInstanceDataManagerConfig(PinotConfiguration serverConfig)
       throws ConfigurationException {
-    _instanceDataManagerConfiguration = serverConfig;
+    _serverConfig = serverConfig;
 
     for (String key : serverConfig.getKeys()) {
       LOGGER.info("InstanceDataManagerConfig, key: {} , value: {}", key, 
serverConfig.getProperty(key));
     }
 
     checkRequiredKeys();
+
+    _authConfig = serverConfig.subset(AUTH_CONFIG_PREFIX);
+    _upsertConfig = serverConfig.subset(UPSERT_CONFIG_PREFIX);
+
+    PinotConfiguration tierConfigs = getConfig().subset(TIER_CONFIGS_PREFIX);
+    List<String> tierNames = tierConfigs.getProperty(TIER_NAMES, 
Collections.emptyList());
+    if (tierNames.isEmpty()) {
+      _tierConfigs = Collections.emptyMap();
+    } else {
+      _tierConfigs = Maps.newHashMapWithExpectedSize(tierNames.size());
+      for (String tierName : tierNames) {
+        Map<String, String> tierConfigMap = new HashMap<>();
+        tierConfigs.subset(tierName).toMap().forEach((k, v) -> 
tierConfigMap.put(k, String.valueOf(v)));
+        _tierConfigs.put(tierName, tierConfigMap);
+      }
+    }
   }
 
   private void checkRequiredKeys()
       throws ConfigurationException {
     for (String keyString : REQUIRED_KEYS) {
-      
Optional.ofNullable(_instanceDataManagerConfiguration.getProperty(keyString))
+      Optional.ofNullable(_serverConfig.getProperty(keyString))
           .orElseThrow(() -> new ConfigurationException("Cannot find required 
key : " + keyString));
     }
   }
 
   @Override
   public PinotConfiguration getConfig() {
-    return _instanceDataManagerConfiguration;
+    return _serverConfig;
   }
 
   @Override
   public String getInstanceId() {
-    return _instanceDataManagerConfiguration.getProperty(INSTANCE_ID);
+    return _serverConfig.getProperty(INSTANCE_ID);
   }
 
   @Override
   public String getInstanceDataDir() {
-    return _instanceDataManagerConfiguration.getProperty(INSTANCE_DATA_DIR, 
DEFAULT_INSTANCE_DATA_DIR);
+    return _serverConfig.getProperty(INSTANCE_DATA_DIR, 
DEFAULT_INSTANCE_DATA_DIR);
   }
 
   @Override
   public String getConsumerDir() {
-    return _instanceDataManagerConfiguration.getProperty(CONSUMER_DIR);
+    return _serverConfig.getProperty(CONSUMER_DIR);
   }
 
   @Override
   public String getInstanceSegmentTarDir() {
-    return 
_instanceDataManagerConfiguration.getProperty(INSTANCE_SEGMENT_TAR_DIR, 
DEFAULT_INSTANCE_SEGMENT_TAR_DIR);
+    return _serverConfig.getProperty(INSTANCE_SEGMENT_TAR_DIR, 
DEFAULT_INSTANCE_SEGMENT_TAR_DIR);
   }
 
   @Override
   public String getInstanceBootstrapSegmentDir() {
-    return 
_instanceDataManagerConfiguration.getProperty(INSTANCE_BOOTSTRAP_SEGMENT_DIR);
+    return _serverConfig.getProperty(INSTANCE_BOOTSTRAP_SEGMENT_DIR);
   }
 
   @Override
   public String getSegmentStoreUri() {
-    return 
_instanceDataManagerConfiguration.getProperty(CONFIG_OF_SEGMENT_STORE_URI);
+    return _serverConfig.getProperty(CONFIG_OF_SEGMENT_STORE_URI);
   }
 
   @Override
   public ReadMode getReadMode() {
-    return 
ReadMode.valueOf(_instanceDataManagerConfiguration.getProperty(READ_MODE, 
DEFAULT_READ_MODE));
+    return ReadMode.valueOf(_serverConfig.getProperty(READ_MODE, 
DEFAULT_READ_MODE));
   }
 
   @Override
   public String getSegmentFormatVersion() {
-    return 
_instanceDataManagerConfiguration.getProperty(SEGMENT_FORMAT_VERSION);
+    return _serverConfig.getProperty(SEGMENT_FORMAT_VERSION);
   }
 
   @Override
   public boolean isRealtimeOffHeapAllocation() {
-    return 
_instanceDataManagerConfiguration.getProperty(REALTIME_OFFHEAP_ALLOCATION, 
true);
+    return _serverConfig.getProperty(REALTIME_OFFHEAP_ALLOCATION, true);
   }
 
   @Override
   public boolean isDirectRealtimeOffHeapAllocation() {
-    return 
_instanceDataManagerConfiguration.getProperty(DIRECT_REALTIME_OFFHEAP_ALLOCATION,
 false);
+    return _serverConfig.getProperty(DIRECT_REALTIME_OFFHEAP_ALLOCATION, 
false);
   }
 
   public boolean shouldReloadConsumingSegment() {
-    return _instanceDataManagerConfiguration
-        .getProperty(INSTANCE_RELOAD_CONSUMING_SEGMENT, 
Server.DEFAULT_RELOAD_CONSUMING_SEGMENT);
+    return _serverConfig.getProperty(INSTANCE_RELOAD_CONSUMING_SEGMENT, 
Server.DEFAULT_RELOAD_CONSUMING_SEGMENT);
   }
 
   @Override
   public String getAvgMultiValueCount() {
-    return _instanceDataManagerConfiguration.getProperty(AVERAGE_MV_COUNT);
+    return _serverConfig.getProperty(AVERAGE_MV_COUNT);
   }
 
   public int getMaxParallelRefreshThreads() {
-    return 
_instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_REFRESH_THREADS, 1);
+    return _serverConfig.getProperty(MAX_PARALLEL_REFRESH_THREADS, 1);
   }
 
   public int getMaxSegmentPreloadThreads() {
-    return 
_instanceDataManagerConfiguration.getProperty(MAX_SEGMENT_PRELOAD_THREADS, 0);
+    return _serverConfig.getProperty(MAX_SEGMENT_PRELOAD_THREADS, 0);
   }
 
   public int getMaxParallelSegmentBuilds() {
-    return _instanceDataManagerConfiguration
-        .getProperty(MAX_PARALLEL_SEGMENT_BUILDS, 
DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS);
+    return _serverConfig.getProperty(MAX_PARALLEL_SEGMENT_BUILDS, 
DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS);
   }
 
   @Override
   public int getMaxParallelSegmentDownloads() {
-    return 
_instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_SEGMENT_DOWNLOADS,
-        DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
+    return _serverConfig.getProperty(MAX_PARALLEL_SEGMENT_DOWNLOADS, 
DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
   }
 
   public String getSegmentDirectoryLoader() {
-    return 
_instanceDataManagerConfiguration.getProperty(SEGMENT_DIRECTORY_LOADER,
+    return _serverConfig.getProperty(SEGMENT_DIRECTORY_LOADER,
         SegmentDirectoryLoaderRegistry.DEFAULT_SEGMENT_DIRECTORY_LOADER_NAME);
   }
 
   @Override
   public long getErrorCacheSize() {
-    return _instanceDataManagerConfiguration.getProperty(ERROR_CACHE_SIZE, 
DEFAULT_ERROR_CACHE_SIZE);
+    return _serverConfig.getProperty(ERROR_CACHE_SIZE, 
DEFAULT_ERROR_CACHE_SIZE);
   }
 
   @Override
   public boolean isStreamSegmentDownloadUntar() {
-    return 
_instanceDataManagerConfiguration.getProperty(ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR,
+    return _serverConfig.getProperty(ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR,
         DEFAULT_ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR);
   }
 
   @Override
   public long getStreamSegmentDownloadUntarRateLimit() {
-    return 
_instanceDataManagerConfiguration.getProperty(STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT,
+    return _serverConfig.getProperty(STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT,
         DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT);
   }
 
   @Override
   public int getDeletedSegmentsCacheSize() {
-    return 
_instanceDataManagerConfiguration.getProperty(DELETED_SEGMENTS_CACHE_SIZE,
-        DEFAULT_DELETED_SEGMENTS_CACHE_SIZE);
+    return _serverConfig.getProperty(DELETED_SEGMENTS_CACHE_SIZE, 
DEFAULT_DELETED_SEGMENTS_CACHE_SIZE);
   }
 
   @Override
   public int getDeletedSegmentsCacheTtlMinutes() {
-    return 
_instanceDataManagerConfiguration.getProperty(DELETED_SEGMENTS_CACHE_TTL_MINUTES,
-        DEFAULT_DELETED_SEGMENTS_CACHE_TTL_MINUTES);
+    return _serverConfig.getProperty(DELETED_SEGMENTS_CACHE_TTL_MINUTES, 
DEFAULT_DELETED_SEGMENTS_CACHE_TTL_MINUTES);
   }
 
   @Override
   public String getSegmentPeerDownloadScheme() {
-    return _instanceDataManagerConfiguration.getProperty(PEER_DOWNLOAD_SCHEME);
+    return _serverConfig.getProperty(PEER_DOWNLOAD_SCHEME);
   }
 
   @Override
   public long getExternalViewDroppedMaxWaitMs() {
-    return 
_instanceDataManagerConfiguration.getProperty(EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS,
-        DEFAULT_EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS);
+    return _serverConfig.getProperty(EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS, 
DEFAULT_EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS);
   }
 
   @Override
   public long getExternalViewDroppedCheckIntervalMs() {
-    return 
_instanceDataManagerConfiguration.getProperty(EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS,
+    return _serverConfig.getProperty(EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS,
         DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS);
   }
 
   @Override
-  public PinotConfiguration getUpsertConfigs() {
-    return 
_instanceDataManagerConfiguration.subset(PREFIX_OF_CONFIG_OF_UPSERT);
+  public PinotConfiguration getUpsertConfig() {
+    return _upsertConfig;
+  }
+
+  @Override
+  public PinotConfiguration getAuthConfig() {
+    return _authConfig;
   }
 
   @Override
-  public String toString() {
-    String configString = "";
-    configString += "Instance Id: " + getInstanceId();
-    configString += "\n\tInstance Data Dir: " + getInstanceDataDir();
-    configString += "\n\tInstance Segment Tar Dir: " + 
getInstanceSegmentTarDir();
-    configString += "\n\tBootstrap Segment Dir: " + 
getInstanceBootstrapSegmentDir();
-    configString += "\n\tRead Mode: " + getReadMode();
-    configString += "\n\tSegment format version: " + getSegmentFormatVersion();
-    return configString;
+  public Map<String, Map<String, String>> getTierConfigs() {
+    return _tierConfigs;
   }
 }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 3723f7a259..e57371f72d 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -30,7 +30,6 @@ import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.WebTarget;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -40,8 +39,6 @@ import 
org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.transport.HttpServerThreadPoolConfig;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -50,6 +47,9 @@ import 
org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.server.access.AllowAllAccessFactory;
 import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.NetUtils;
@@ -69,16 +69,16 @@ import static org.mockito.Mockito.when;
 
 public abstract class BaseResourceTest {
   private static final String AVRO_DATA_PATH = "data/test_data-mv.avro";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"BaseResourceTest");
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"BaseResourceTest");
   protected static final String TABLE_NAME = "testTable";
   protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS =
-      new 
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 1, 0, 
System.currentTimeMillis())
-          .getSegmentName();
+      new 
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 1, 0,
+          System.currentTimeMillis()).getSegmentName();
   protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE =
-      new 
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 2, 0, 
System.currentTimeMillis())
-          .getSegmentName();
-  protected static final String SEGMENT_DOWNLOAD_URL = StringUtil
-      .join("/", "hdfs://root", TABLE_NAME, 
LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS);
+      new 
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 2, 0,
+          System.currentTimeMillis()).getSegmentName();
+  protected static final String SEGMENT_DOWNLOAD_URL =
+      StringUtil.join("/", "hdfs://root", TABLE_NAME, 
LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS);
 
   private final Map<String, TableDataManager> _tableDataManagerMap = new 
HashMap<>();
   protected final List<ImmutableSegment> _realtimeIndexSegments = new 
ArrayList<>();
@@ -92,32 +92,34 @@ public abstract class BaseResourceTest {
   @BeforeClass
   public void setUp()
       throws Exception {
-    FileUtils.deleteQuietly(INDEX_DIR);
-    Assert.assertTrue(INDEX_DIR.mkdirs());
+    ServerMetrics.register(mock(ServerMetrics.class));
+
+    FileUtils.deleteQuietly(TEMP_DIR);
+    Assert.assertTrue(TEMP_DIR.mkdirs());
     URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
     Assert.assertNotNull(resourceUrl);
     _avroFile = new File(resourceUrl.getFile());
 
     // Mock the instance data manager
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
-    when(instanceDataManager.getTableDataManager(anyString()))
-        .thenAnswer(invocation -> 
_tableDataManagerMap.get(invocation.getArguments()[0]));
+    when(instanceDataManager.getTableDataManager(anyString())).thenAnswer(
+        invocation -> _tableDataManagerMap.get(invocation.getArguments()[0]));
     
when(instanceDataManager.getAllTables()).thenReturn(_tableDataManagerMap.keySet());
 
     // Mock the server instance
     ServerInstance serverInstance = mock(ServerInstance.class);
     
when(serverInstance.getServerMetrics()).thenReturn(mock(ServerMetrics.class));
     
when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager);
-    when(serverInstance.getInstanceDataManager().getSegmentFileDirectory())
-        .thenReturn(FileUtils.getTempDirectoryPath());
+    
when(serverInstance.getInstanceDataManager().getSegmentFileDirectory()).thenReturn(
+        FileUtils.getTempDirectoryPath());
     
when(serverInstance.getHelixManager()).thenReturn(mock(HelixManager.class));
 
     // Mock the segment uploader
     SegmentUploader segmentUploader = mock(SegmentUploader.class);
-    when(segmentUploader.uploadSegment(any(File.class), eq(new 
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS))))
-        .thenReturn(new URI(SEGMENT_DOWNLOAD_URL));
-    when(segmentUploader.uploadSegment(any(File.class), eq(new 
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE))))
-        .thenReturn(null);
+    when(segmentUploader.uploadSegment(any(File.class),
+        eq(new 
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)))).thenReturn(new 
URI(SEGMENT_DOWNLOAD_URL));
+    when(segmentUploader.uploadSegment(any(File.class),
+        eq(new 
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)))).thenReturn(null);
     when(instanceDataManager.getSegmentUploader()).thenReturn(segmentUploader);
 
     // Add the default tables and segments.
@@ -155,7 +157,7 @@ public abstract class BaseResourceTest {
     for (ImmutableSegment immutableSegment : _offlineIndexSegments) {
       immutableSegment.destroy();
     }
-    FileUtils.deleteQuietly(INDEX_DIR);
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 
   protected List<ImmutableSegment> setUpSegments(String tableNameWithType, int 
numSegments,
@@ -163,8 +165,8 @@ public abstract class BaseResourceTest {
       throws Exception {
     List<ImmutableSegment> immutableSegments = new ArrayList<>();
     for (int i = 0; i < numSegments; i++) {
-      immutableSegments
-          .add(setUpSegment(tableNameWithType, null, 
Integer.toString(_realtimeIndexSegments.size()), segments));
+      immutableSegments.add(
+          setUpSegment(tableNameWithType, null, 
Integer.toString(_realtimeIndexSegments.size()), segments));
     }
     return immutableSegments;
   }
@@ -172,30 +174,31 @@ public abstract class BaseResourceTest {
   protected ImmutableSegment setUpSegment(String tableNameWithType, String 
segmentName, String segmentNamePostfix,
       List<ImmutableSegment> segments)
       throws Exception {
+    File tableDataDir = new File(TEMP_DIR, tableNameWithType);
     SegmentGeneratorConfig config =
-        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
INDEX_DIR, tableNameWithType);
+        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, 
tableDataDir, tableNameWithType);
     config.setSegmentName(segmentName);
     config.setSegmentNamePostfix(segmentNamePostfix);
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
     driver.init(config);
     driver.build();
     ImmutableSegment immutableSegment =
-        ImmutableSegmentLoader.load(new File(INDEX_DIR, 
driver.getSegmentName()), ReadMode.mmap);
+        ImmutableSegmentLoader.load(new File(tableDataDir, 
driver.getSegmentName()), ReadMode.mmap);
     segments.add(immutableSegment);
     _tableDataManagerMap.get(tableNameWithType).addSegment(immutableSegment);
     return immutableSegment;
   }
 
-  @SuppressWarnings("unchecked")
   protected void addTable(String tableNameWithType) {
-    TableDataManagerConfig tableDataManagerConfig = 
mock(TableDataManagerConfig.class);
-    when(tableDataManagerConfig.getTableName()).thenReturn(tableNameWithType);
-    
when(tableDataManagerConfig.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
+    InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
+    
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
+    TableConfig tableConfig = mock(TableConfig.class);
+    when(tableConfig.getTableName()).thenReturn(tableNameWithType);
+    
when(tableConfig.getValidationConfig()).thenReturn(mock(SegmentsValidationAndRetentionConfig.class));
     // NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table 
because RealtimeTableDataManager requires
     //       table config.
     TableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(tableDataManagerConfig, "testInstance", 
mock(ZkHelixPropertyStore.class),
-        mock(ServerMetrics.class), mock(HelixManager.class), null, null, new 
TableDataManagerParams(0, false, -1));
+    tableDataManager.init(instanceDataManagerConfig, tableConfig, 
mock(HelixManager.class), null, null);
     tableDataManager.start();
     _tableDataManagerMap.put(tableNameWithType, tableDataManager);
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index 6d035890c9..4247f76269 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.spi.config.instance;
 
+import java.util.Map;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.ReadMode;
 
 
 public interface InstanceDataManagerConfig {
+
   PinotConfiguration getConfig();
 
   String getInstanceId();
@@ -69,5 +71,9 @@ public interface InstanceDataManagerConfig {
 
   long getExternalViewDroppedCheckIntervalMs();
 
-  PinotConfiguration getUpsertConfigs();
+  PinotConfiguration getUpsertConfig();
+
+  PinotConfiguration getAuthConfig();
+
+  Map<String, Map<String, String>> getTierConfigs();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to