This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4ad36c3482 Remove TableDataManagerConfig and simplify TableDataManager
construction (#12189)
4ad36c3482 is described below
commit 4ad36c3482740386264478717343316b53cabeba
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jan 17 16:54:08 2024 -0800
Remove TableDataManagerConfig and simplify TableDataManager construction
(#12189)
---
.../pinot/common/auth/AuthProviderUtils.java | 3 +
.../core/data/manager/BaseTableDataManager.java | 86 +++++++-------
.../manager/offline/TableDataManagerProvider.java | 52 +++-----
.../realtime/RealtimeSegmentDataManager.java | 2 +-
.../manager/realtime/RealtimeTableDataManager.java | 8 +-
.../BaseTableDataManagerAcquireSegmentTest.java | 40 +++----
.../data/manager/BaseTableDataManagerTest.java | 90 ++++++--------
.../offline/DimensionTableDataManagerTest.java | 24 ++--
.../realtime/RealtimeSegmentDataManagerTest.java | 73 +++++-------
.../realtime/RealtimeTableDataManagerTest.java | 77 ++++++------
.../executor/QueryExecutorExceptionsTest.java | 26 ++--
.../core/query/executor/QueryExecutorTest.java | 50 +++-----
.../pinot/queries/ExplainPlanQueriesTest.java | 39 +++---
.../queries/SegmentWithNullValueVectorTest.java | 56 +++------
.../tests/UpsertTableIntegrationTest.java | 2 +-
.../UpsertTableSegmentPreloadIntegrationTest.java | 4 +-
.../local/data/manager/TableDataManager.java | 29 ++---
.../local/data/manager/TableDataManagerConfig.java | 113 ------------------
.../local/data/manager/TableDataManagerParams.java | 66 -----------
.../upsert/BaseTableUpsertMetadataManager.java | 9 +-
.../upsert/TableUpsertMetadataManagerFactory.java | 13 +-
.../starter/helix/HelixInstanceDataManager.java | 21 ++--
.../helix/HelixInstanceDataManagerConfig.java | 132 ++++++++++++---------
.../apache/pinot/server/api/BaseResourceTest.java | 65 +++++-----
.../config/instance/InstanceDataManagerConfig.java | 8 +-
25 files changed, 410 insertions(+), 678 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
index 513bbf0929..cc02e01fa3 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/auth/AuthProviderUtils.java
@@ -63,6 +63,9 @@ public final class AuthProviderUtils {
* @return auth provider
*/
public static AuthProvider extractAuthProvider(PinotConfiguration
pinotConfig, String namespace) {
+ if (pinotConfig == null) {
+ return new NullAuthProvider();
+ }
return makeAuthProvider(extractAuthConfig(pinotConfig, namespace));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index f13f04c3b2..119bede805 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -60,8 +60,6 @@ import
org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -73,6 +71,7 @@ import
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -90,8 +89,10 @@ public abstract class BaseTableDataManager implements
TableDataManager {
// Semaphore to restrict the maximum number of parallel segment downloads
for a table.
private Semaphore _segmentDownloadSemaphore;
- protected TableDataManagerConfig _tableDataManagerConfig;
+ protected InstanceDataManagerConfig _instanceDataManagerConfig;
protected String _instanceId;
+ protected TableConfig _tableConfig;
+ protected HelixManager _helixManager;
protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected ServerMetrics _serverMetrics;
protected String _tableNameWithType;
@@ -99,9 +100,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected File _indexDir;
protected File _resourceTmpDir;
protected Logger _logger;
- protected HelixManager _helixManager;
protected ExecutorService _segmentPreloadExecutor;
protected AuthProvider _authProvider;
+ protected String _peerDownloadScheme;
protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
protected boolean _isStreamSegmentDownloadUntar;
@@ -114,25 +115,22 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected volatile boolean _shutDown;
@Override
- public void init(TableDataManagerConfig tableDataManagerConfig, String
instanceId,
- ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics
serverMetrics, HelixManager helixManager,
- @Nullable ExecutorService segmentPreloadExecutor,
- @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo>
errorCache,
- TableDataManagerParams tableDataManagerParams) {
- LOGGER.info("Initializing table data manager for table: {}",
tableDataManagerConfig.getTableName());
-
- _tableDataManagerConfig = tableDataManagerConfig;
- _instanceId = instanceId;
- _propertyStore = propertyStore;
- _serverMetrics = serverMetrics;
+ public void init(InstanceDataManagerConfig instanceDataManagerConfig,
TableConfig tableConfig,
+ HelixManager helixManager, @Nullable ExecutorService
segmentPreloadExecutor,
+ @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo>
errorCache) {
+ LOGGER.info("Initializing table data manager for table: {}",
tableConfig.getTableName());
+
+ _instanceDataManagerConfig = instanceDataManagerConfig;
+ _instanceId = instanceDataManagerConfig.getInstanceId();
+ _tableConfig = tableConfig;
_helixManager = helixManager;
+ _propertyStore = helixManager.getHelixPropertyStore();
+ _serverMetrics = ServerMetrics.get();
_segmentPreloadExecutor = segmentPreloadExecutor;
+ _authProvider =
AuthProviderUtils.extractAuthProvider(_instanceDataManagerConfig.getAuthConfig(),
null);
- _authProvider =
-
AuthProviderUtils.extractAuthProvider(toPinotConfiguration(_tableDataManagerConfig.getAuthConfig()),
null);
-
- _tableNameWithType = tableDataManagerConfig.getTableName();
- _tableDataDir = tableDataManagerConfig.getDataDir();
+ _tableNameWithType = tableConfig.getTableName();
+ _tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() +
File.separator + _tableNameWithType;
_indexDir = new File(_tableDataDir);
if (!_indexDir.exists()) {
Preconditions.checkState(_indexDir.mkdirs(), "Unable to create index
directory at %s. "
@@ -148,18 +146,23 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
_errorCache = errorCache;
_recentlyDeletedSegments =
-
CacheBuilder.newBuilder().maximumSize(tableDataManagerConfig.getTableDeletedSegmentsCacheSize())
-
.expireAfterWrite(tableDataManagerConfig.getTableDeletedSegmentsCacheTtlMinutes(),
TimeUnit.MINUTES)
- .build();
+
CacheBuilder.newBuilder().maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize())
+
.expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(),
TimeUnit.MINUTES).build();
+
+ _peerDownloadScheme =
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
+ if (_peerDownloadScheme == null) {
+ _peerDownloadScheme =
instanceDataManagerConfig.getSegmentPeerDownloadScheme();
+ }
+
_streamSegmentDownloadUntarRateLimitBytesPerSec =
-
tableDataManagerParams.getStreamSegmentDownloadUntarRateLimitBytesPerSec();
- _isStreamSegmentDownloadUntar =
tableDataManagerParams.isStreamSegmentDownloadUntar();
+ instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
+ _isStreamSegmentDownloadUntar =
instanceDataManagerConfig.isStreamSegmentDownloadUntar();
if (_isStreamSegmentDownloadUntar) {
LOGGER.info("Using streamed download-untar for segment download! "
+ "The rate limit interval for streamed download-untar is {}
bytes/s",
_streamSegmentDownloadUntarRateLimitBytesPerSec);
}
- int maxParallelSegmentDownloads =
tableDataManagerParams.getMaxParallelSegmentDownloads();
+ int maxParallelSegmentDownloads =
instanceDataManagerConfig.getMaxParallelSegmentDownloads();
if (maxParallelSegmentDownloads > 0) {
LOGGER.info(
"Construct segment download semaphore for Table: {}. Maximum number
of parallel segment downloads: {}",
@@ -178,6 +181,16 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected abstract void doInit();
+ @Override
+ public String getInstanceId() {
+ return _instanceId;
+ }
+
+ @Override
+ public InstanceDataManagerConfig getInstanceDataManagerConfig() {
+ return _instanceDataManagerConfig;
+ }
+
@Override
public synchronized void start() {
_logger.info("Starting table data manager for table: {}",
_tableNameWithType);
@@ -255,7 +268,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
indexDir.getName(), _tableNameWithType);
indexLoadingConfig.setTableDataDir(_tableDataDir);
-
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
indexLoadingConfig.getSchema()));
}
@@ -382,11 +395,6 @@ public abstract class BaseTableDataManager implements
TableDataManager {
return _indexDir;
}
- @Override
- public TableDataManagerConfig getTableDataManagerConfig() {
- return _tableDataManagerConfig;
- }
-
@Override
public void addSegmentError(String segmentName, SegmentErrorInfo
segmentErrorInfo) {
_errorCache.put(Pair.of(_tableNameWithType, segmentName),
segmentErrorInfo);
@@ -413,7 +421,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
String segmentTier = getSegmentCurrentTier(segmentName);
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
-
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
File indexDir = getSegmentDataDir(segmentName, segmentTier,
indexLoadingConfig.getTableConfig());
try {
// Download segment from deep store if CRC changes or forced to download;
@@ -507,7 +515,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
String segmentTier = zkMetadata.getTier();
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
-
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
if (localMetadata == null && tryLoadExistingSegment(segmentName,
indexLoadingConfig, zkMetadata)) {
return;
}
@@ -630,7 +638,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
LOGGER.error("Attempts exceeded when downloading segment: {} for table:
{} from: {} to: {}", segmentName,
_tableNameWithType, uri, tarFile);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L);
- if (_tableDataManagerConfig.getTablePeerDownloadScheme() == null) {
+ if (_peerDownloadScheme == null) {
throw e;
}
downloadFromPeersWithoutStreaming(segmentName, zkMetadata, tarFile);
@@ -649,11 +657,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
// not thread safe. Caller should invoke it with safe concurrency control.
protected void downloadFromPeersWithoutStreaming(String segmentName,
SegmentZKMetadata zkMetadata, File destTarFile)
throws Exception {
-
Preconditions.checkArgument(_tableDataManagerConfig.getTablePeerDownloadScheme()
!= null,
- "Download peers require non null peer download scheme");
+ Preconditions.checkState(_peerDownloadScheme != null, "Download peers
require non null peer download scheme");
List<URI> peerSegmentURIs =
- PeerServerSegmentFinder.getPeerServerURIs(segmentName,
_tableDataManagerConfig.getTablePeerDownloadScheme(),
- _helixManager, _tableNameWithType);
+ PeerServerSegmentFinder.getPeerServerURIs(segmentName,
_peerDownloadScheme, _helixManager, _tableNameWithType);
if (peerSegmentURIs.isEmpty()) {
String msg = String.format("segment %s doesn't have any peers",
segmentName);
LOGGER.warn(msg);
@@ -745,7 +751,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
return getSegmentDataDir(segmentName);
}
String tierDataDir =
- TierConfigUtils.getDataDirForTier(tableConfig, segmentTier,
_tableDataManagerConfig.getInstanceTierConfigs());
+ TierConfigUtils.getDataDirForTier(tableConfig, segmentTier,
_instanceDataManagerConfig.getTierConfigs());
if (StringUtils.isEmpty(tierDataDir)) {
return getSegmentDataDir(segmentName);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 946ecf1565..f3cfa41ee7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -27,15 +27,11 @@ import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -45,47 +41,38 @@ import org.apache.pinot.spi.utils.IngestionConfigUtils;
* Factory for {@link TableDataManager}.
*/
public class TableDataManagerProvider {
- private static Semaphore _segmentBuildSemaphore;
- private static TableDataManagerParams _tableDataManagerParams;
+ private final InstanceDataManagerConfig _instanceDataManagerConfig;
+ private final Semaphore _segmentBuildSemaphore;
- private TableDataManagerProvider() {
+ public TableDataManagerProvider(InstanceDataManagerConfig
instanceDataManagerConfig) {
+ _instanceDataManagerConfig = instanceDataManagerConfig;
+ int maxParallelSegmentBuilds =
instanceDataManagerConfig.getMaxParallelSegmentBuilds();
+ _segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new
Semaphore(maxParallelSegmentBuilds, true) : null;
}
- public static void init(InstanceDataManagerConfig instanceDataManagerConfig)
{
- int maxParallelBuilds =
instanceDataManagerConfig.getMaxParallelSegmentBuilds();
- if (maxParallelBuilds > 0) {
- _segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
- }
- _tableDataManagerParams = new
TableDataManagerParams(instanceDataManagerConfig);
- }
-
- public static TableDataManager getTableDataManager(TableDataManagerConfig
tableDataManagerConfig, String instanceId,
- ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics
serverMetrics, HelixManager helixManager,
- LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
- return getTableDataManager(tableDataManagerConfig, instanceId,
propertyStore, serverMetrics, helixManager, null,
- errorCache, () -> true);
+ public TableDataManager getTableDataManager(TableConfig tableConfig,
HelixManager helixManager) {
+ return getTableDataManager(tableConfig, helixManager, null, null, () ->
true);
}
- public static TableDataManager getTableDataManager(TableDataManagerConfig
tableDataManagerConfig, String instanceId,
- ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics
serverMetrics, HelixManager helixManager,
- @Nullable ExecutorService segmentPreloadExecutor,
LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
+ public TableDataManager getTableDataManager(TableConfig tableConfig,
HelixManager helixManager,
+ @Nullable ExecutorService segmentPreloadExecutor,
+ @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo>
errorCache,
Supplier<Boolean> isServerReadyToServeQueries) {
TableDataManager tableDataManager;
- switch (tableDataManagerConfig.getTableType()) {
+ switch (tableConfig.getTableType()) {
case OFFLINE:
- if (tableDataManagerConfig.isDimTable()) {
- tableDataManager =
DimensionTableDataManager.createInstanceByTableName(tableDataManagerConfig.getTableName());
+ if (tableConfig.isDimTable()) {
+ tableDataManager =
DimensionTableDataManager.createInstanceByTableName(tableConfig.getTableName());
} else {
tableDataManager = new OfflineTableDataManager();
}
break;
case REALTIME:
- Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMap(
- tableDataManagerConfig.getTableConfig());
+ Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMap(tableConfig);
if
(Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
- &&
StringUtils.isEmpty(tableDataManagerConfig.getInstanceDataManagerConfig().getSegmentStoreUri()))
{
+ &&
StringUtils.isEmpty(_instanceDataManagerConfig.getSegmentStoreUri())) {
throw new IllegalStateException(String.format("Table has enabled %s
config. But the server has not "
- + "configured the segmentstore uri. Configure the server config
%s",
+ + "configured the segmentstore uri. Configure the server
config %s",
StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
}
tableDataManager = new
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
@@ -93,8 +80,7 @@ public class TableDataManagerProvider {
default:
throw new IllegalStateException();
}
- tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore,
serverMetrics, helixManager,
- segmentPreloadExecutor, errorCache, _tableDataManagerParams);
+ tableDataManager.init(_instanceDataManagerConfig, tableConfig,
helixManager, segmentPreloadExecutor, errorCache);
return tableDataManager;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index fa1501e868..282f3c72a1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1358,7 +1358,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_isReadyToConsumeData = isReadyToConsumeData;
_segmentVersion = indexLoadingConfig.getSegmentVersion();
- _instanceId = _realtimeTableDataManager.getServerInstance();
+ _instanceId = _realtimeTableDataManager.getInstanceId();
_leaseExtender =
SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
_protocolHandler = new
ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
CompletionConfig completionConfig =
_tableConfig.getValidationConfig().getCompletionConfig();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 333be09b0c..2ddc0f7eb5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -207,8 +207,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
// NOTE: Set _tableUpsertMetadataManager before initializing it because
when preloading is enabled, we need to
// load segments into it
- _tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig,
-
_tableDataManagerConfig.getInstanceDataManagerConfig().getUpsertConfigs());
+ _tableUpsertMetadataManager =
+ TableUpsertMetadataManagerFactory.create(tableConfig,
_instanceDataManagerConfig.getUpsertConfig());
_tableUpsertMetadataManager.init(tableConfig, schema, this,
_helixManager, _segmentPreloadExecutor);
}
@@ -328,7 +328,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
public String getConsumerDir() {
- String consumerDirPath = _tableDataManagerConfig.getConsumerDir();
+ String consumerDirPath = _instanceDataManagerConfig.getConsumerDir();
File consumerDir;
// If a consumer directory has been configured, use it to create a
per-table path under the consumer dir.
// Otherwise, create a sub-dir under the table-specific data director and
use it for consumer mmaps
@@ -391,7 +391,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// Assign table directory and tier info to not let the segment be moved
during loading/preprocessing
indexLoadingConfig.setTableDataDir(_tableDataDir);
-
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
indexLoadingConfig.setSegmentTier(segmentZKMetadata.getTier());
File segmentDir = new File(_indexDir, segmentName);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index c5a38c4f94..a245c740be 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -23,7 +23,6 @@ import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -31,26 +30,24 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.configuration2.MapConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
-import org.testng.annotations.AfterSuite;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
@@ -58,7 +55,6 @@ import static org.mockito.Mockito.*;
public class BaseTableDataManagerAcquireSegmentTest {
private static final String RAW_TABLE_NAME = "testTable";
- private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final String SEGMENT_PREFIX = "segment";
private static final int DELETED_SEGMENTS_CACHE_SIZE = 100;
private static final int DELETED_SEGMENTS_TTL_MINUTES = 2;
@@ -86,9 +82,11 @@ public class BaseTableDataManagerAcquireSegmentTest {
private volatile int _lo;
private volatile int _hi;
- @BeforeSuite
+ @BeforeClass
public void setUp()
throws Exception {
+ ServerMetrics.register(mock(ServerMetrics.class));
+
_tmpDir = new File(FileUtils.getTempDirectory(),
"OfflineTableDataManagerTest");
TestUtils.ensureDirectoriesExistAndEmpty(_tmpDir);
_tmpDir.deleteOnExit();
@@ -98,7 +96,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
System.out.printf("Record random seed: %d to reproduce test results upon
failure\n", seed);
}
- @AfterSuite
+ @AfterClass
public void tearDown() {
if (_tmpDir != null) {
org.apache.commons.io.FileUtils.deleteQuietly(_tmpDir);
@@ -119,19 +117,13 @@ public class BaseTableDataManagerAcquireSegmentTest {
private TableDataManager makeTestableManager()
throws Exception {
+ InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(_tmpDir.getAbsolutePath());
+
when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
+
when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
TableDataManager tableDataManager = new OfflineTableDataManager();
- TableDataManagerConfig config;
- {
- config = mock(TableDataManagerConfig.class);
- when(config.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
- when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
- when(config.getAuthConfig()).thenReturn(new MapConfiguration(new
HashMap<>()));
-
when(config.getTableDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
-
when(config.getTableDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
- }
- tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
- new TableDataManagerParams(0, false, -1));
+ tableDataManager.init(instanceDataManagerConfig, tableConfig,
mock(HelixManager.class), null, null);
tableDataManager.start();
Field segsMapField =
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
segsMapField.setAccessible(true);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 9113a327ad..d4c5f4fc29 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -29,10 +29,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import org.apache.commons.configuration2.MapConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.tier.TierFactory;
@@ -41,8 +39,6 @@ import
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -51,6 +47,7 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
@@ -60,23 +57,17 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.util.TestUtils;
import org.mockito.MockedStatic;
-import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -93,15 +84,21 @@ public class BaseTableDataManagerTest {
private static final String LONG_COLUMN = "col2";
private static final long[] LONG_VALUES = {10000L, 20000L, 50000L, 40000L,
30000L};
- @BeforeMethod
+ @BeforeClass
public void setUp()
throws Exception {
+ ServerMetrics.register(mock(ServerMetrics.class));
+ }
+
+ @BeforeMethod
+ public void setUpMethod()
+ throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
TableDataManagerTestUtils.initSegmentFetcher();
}
@AfterMethod
- public void tearDown()
+ public void tearDownMethod()
throws Exception {
FileUtils.deleteDirectory(TEMP_DIR);
}
@@ -623,14 +620,15 @@ public class BaseTableDataManagerTest {
// case 2: if the attempt to download from deep storage exceeds, invoke
downloadFromPeers.
@Test
- public void testDownloadAndDecryptPeerDownload() throws Exception {
+ public void testDownloadAndDecryptPeerDownload()
+ throws Exception {
String backupCopyURI = mockRemoteCopy().toString();
SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
when(zkmd.getDownloadUrl()).thenReturn(backupCopyURI);
- TableDataManagerConfig config = createDefaultTableDataManagerConfig();
- when(config.getTablePeerDownloadScheme()).thenReturn("http");
- BaseTableDataManager tmgr = createSpyOfflineTableManager(config);
+ InstanceDataManagerConfig config =
createDefaultInstanceDataManagerConfig();
+ when(config.getSegmentPeerDownloadScheme()).thenReturn("http");
+ BaseTableDataManager tmgr = spy(createTableManager(config));
File tempRootDir = tmgr.getTmpSegmentDataDir("test-download-decrypt-peer");
// As the case 2 description says, we need to mock the static method
fetchAndDecryptSegmentToLocal to
@@ -651,17 +649,18 @@ public class BaseTableDataManagerTest {
// happy case: download from peers
@Test
- public void testDownloadFromPeersWithoutStreaming() throws Exception {
+ public void testDownloadFromPeersWithoutStreaming()
+ throws Exception {
URI uri = mockRemoteCopy();
- TableDataManagerConfig config = createDefaultTableDataManagerConfig();
- when(config.getTablePeerDownloadScheme()).thenReturn("http");
- HelixManager mockedHelix = mock(HelixManager.class);
- BaseTableDataManager tmgr = createTableManager(config, mockedHelix);
+ InstanceDataManagerConfig config =
createDefaultInstanceDataManagerConfig();
+ when(config.getSegmentPeerDownloadScheme()).thenReturn("http");
+ HelixManager helixManager = mock(HelixManager.class);
+ BaseTableDataManager tmgr = createTableManager(config, helixManager);
File tempRootDir =
tmgr.getTmpSegmentDataDir("test-download-peer-without-streaming");
File destFile = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
try (MockedStatic<PeerServerSegmentFinder> mockPeerSegFinder =
mockStatic(PeerServerSegmentFinder.class)) {
- mockPeerSegFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(
- "seg01", "http", mockedHelix, TABLE_NAME_WITH_TYPE))
+ mockPeerSegFinder.when(
+ () -> PeerServerSegmentFinder.getPeerServerURIs("seg01", "http",
helixManager, TABLE_NAME_WITH_TYPE))
.thenReturn(Collections.singletonList(uri));
tmgr.downloadFromPeersWithoutStreaming("seg01",
mock(SegmentZKMetadata.class), destFile);
}
@@ -714,40 +713,26 @@ public class BaseTableDataManagerTest {
}
}
- private static BaseTableDataManager createTableManager() {
- TableDataManagerConfig config = createDefaultTableDataManagerConfig();
-
- OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
- new TableDataManagerParams(0, false, -1));
- tableDataManager.start();
- return tableDataManager;
+ private static OfflineTableDataManager createTableManager() {
+ return createTableManager(createDefaultInstanceDataManagerConfig());
}
- private static BaseTableDataManager
createTableManager(TableDataManagerConfig config, HelixManager helixManager) {
- OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
helixManager, null, null,
- new TableDataManagerParams(0, false, -1));
- tableDataManager.start();
- return tableDataManager;
+ private static OfflineTableDataManager
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
+ return createTableManager(instanceDataManagerConfig,
mock(HelixManager.class));
}
- private static OfflineTableDataManager
createSpyOfflineTableManager(TableDataManagerConfig tableDataManagerConfig) {
+ private static OfflineTableDataManager
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig,
+ HelixManager helixManager) {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(tableDataManagerConfig, "dummyInstance",
mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
- new TableDataManagerParams(0, false, -1));
+ tableDataManager.init(instanceDataManagerConfig, tableConfig,
helixManager, null, null);
tableDataManager.start();
- return Mockito.spy(tableDataManager);
+ return tableDataManager;
}
- private static TableDataManagerConfig createDefaultTableDataManagerConfig() {
- TableDataManagerConfig config = mock(TableDataManagerConfig.class);
- when(config.getTableName()).thenReturn(TABLE_NAME_WITH_TYPE);
- when(config.getDataDir()).thenReturn(TABLE_DATA_DIR.getAbsolutePath());
- when(config.getAuthConfig()).thenReturn(new
MapConfiguration(Collections.emptyMap()));
+ private static InstanceDataManagerConfig
createDefaultInstanceDataManagerConfig() {
+ InstanceDataManagerConfig config = mock(InstanceDataManagerConfig.class);
+ when(config.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
return config;
}
@@ -797,7 +782,8 @@ public class BaseTableDataManagerTest {
Collections.singletonMap("dataDir",
dataDir.getAbsolutePath())))).build();
}
- private static URI mockRemoteCopy() throws IOException, URISyntaxException {
+ private static URI mockRemoteCopy()
+ throws IOException, URISyntaxException {
File tempInput = new File(TEMP_DIR, "tmp.txt");
FileUtils.write(tempInput, "this is from somewhere remote");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 6816d01558..6a8c1b83c5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -32,8 +32,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriverFactory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -42,6 +40,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.DimensionTableConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -50,7 +49,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.AfterClass;
@@ -79,20 +77,23 @@ public class DimensionTableDataManagerTest {
@BeforeClass
public void setUp()
throws Exception {
+ ServerMetrics.register(mock(ServerMetrics.class));
+
// prepare segment data
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
assertNotNull(resourceUrl);
File avroFile = new File(resourceUrl.getFile());
// create segment
+ File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);
SegmentGeneratorConfig segmentGeneratorConfig =
- SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile,
TEMP_DIR, RAW_TABLE_NAME);
+ SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile,
tableDataDir, RAW_TABLE_NAME);
SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
driver.init(segmentGeneratorConfig);
driver.build();
String segmentName = driver.getSegmentName();
- _indexDir = new File(TEMP_DIR, segmentName);
+ _indexDir = new File(tableDataDir, segmentName);
_indexLoadingConfig = new IndexLoadingConfig();
_segmentMetadata = new SegmentMetadataImpl(_indexDir);
_segmentZKMetadata = new SegmentZKMetadata(segmentName);
@@ -124,17 +125,12 @@ public class DimensionTableDataManagerTest {
}
private DimensionTableDataManager makeTableDataManager(HelixManager
helixManager) {
+ InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
+ TableConfig tableConfig = getTableConfig(false);
DimensionTableDataManager tableDataManager =
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
- TableDataManagerConfig config;
- {
- config = mock(TableDataManagerConfig.class);
- when(config.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
- when(config.getDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
- }
- tableDataManager.init(config, "dummyInstance",
helixManager.getHelixPropertyStore(),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
helixManager, null, null,
- new TableDataManagerParams(0, false, -1));
+ tableDataManager.init(instanceDataManagerConfig, tableConfig,
helixManager, null, null);
tableDataManager.start();
return tableDataManager;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 1574892f42..cc93fcfe77 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -46,13 +46,11 @@ import
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.creator.Fixtures;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
@@ -78,8 +76,7 @@ import static org.mockito.Mockito.when;
// TODO Re-write this test using the stream abstraction
public class RealtimeSegmentDataManagerTest {
- private static final String SEGMENT_DIR = "/tmp/" +
RealtimeSegmentDataManagerTest.class.getSimpleName();
- private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR);
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"RealtimeSegmentDataManagerTest");
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
private static final int PARTITION_GROUP_ID = 0;
@@ -104,7 +101,7 @@ public class RealtimeSegmentDataManagerTest {
SegmentBuildTimeLeaseExtender.getOrCreate(instanceId, new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
tableConfig.getTableName());
RealtimeTableDataManager tableDataManager =
mock(RealtimeTableDataManager.class);
- when(tableDataManager.getServerInstance()).thenReturn(instanceId);
+ when(tableDataManager.getInstanceId()).thenReturn(instanceId);
RealtimeSegmentStatsHistory statsHistory =
mock(RealtimeSegmentStatsHistory.class);
when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
@@ -148,19 +145,22 @@ public class RealtimeSegmentDataManagerTest {
_partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new
Semaphore(1));
Schema schema = Fixtures.createSchema();
ServerMetrics serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
- return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
tableDataManager, SEGMENT_DIR, schema,
- llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics,
timeSupplier);
+ return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
tableDataManager,
+ new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema,
llcSegmentName,
+ _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier);
}
@BeforeClass
public void setUp() {
- SEGMENT_DIR_FILE.deleteOnExit();
+ ServerMetrics.register(mock(ServerMetrics.class));
+
+ FileUtils.deleteQuietly(TEMP_DIR);
SegmentBuildTimeLeaseExtender.initExecutor();
}
@AfterClass
public void tearDown() {
- FileUtils.deleteQuietly(SEGMENT_DIR_FILE);
+ FileUtils.deleteQuietly(TEMP_DIR);
SegmentBuildTimeLeaseExtender.shutdownExecutor();
}
@@ -222,8 +222,7 @@ public class RealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- RealtimeSegmentDataManager.State.COMMITTED);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}
@@ -286,17 +285,17 @@ public class RealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- RealtimeSegmentDataManager.State.COMMITTED);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}
@Test
- public void testCommitAfterCatchupWithPeriodOffset() throws Exception {
+ public void testCommitAfterCatchupWithPeriodOffset()
+ throws Exception {
TableConfig tableConfig = createTableConfig();
- tableConfig.getIndexingConfig().getStreamConfigs()
- .put(StreamConfigProperties.constructStreamProperty(
- StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA,
"fakeStream"), "2d");
+ tableConfig.getIndexingConfig().getStreamConfigs().put(
+
StreamConfigProperties.constructStreamProperty(StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA,
+ "fakeStream"), "2d");
FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(false, new TimeSupplier(), null, null,
tableConfig);
RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
@@ -333,17 +332,17 @@ public class RealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- RealtimeSegmentDataManager.State.COMMITTED);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}
@Test
- public void testCommitAfterCatchupWithTimestampOffset() throws Exception {
+ public void testCommitAfterCatchupWithTimestampOffset()
+ throws Exception {
TableConfig tableConfig = createTableConfig();
- tableConfig.getIndexingConfig().getStreamConfigs()
- .put(StreamConfigProperties.constructStreamProperty(
- StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA,
"fakeStream"), Instant.now().toString());
+ tableConfig.getIndexingConfig().getStreamConfigs().put(
+
StreamConfigProperties.constructStreamProperty(StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA,
+ "fakeStream"), Instant.now().toString());
FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(false, new TimeSupplier(), null, null,
tableConfig);
RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
@@ -380,8 +379,7 @@ public class RealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertTrue(segmentDataManager._commitSegmentCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- RealtimeSegmentDataManager.State.COMMITTED);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.COMMITTED);
segmentDataManager.destroy();
}
@@ -405,8 +403,7 @@ public class RealtimeSegmentDataManagerTest {
Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
Assert.assertFalse(segmentDataManager._downloadAndReplaceCalled);
Assert.assertFalse(segmentDataManager._commitSegmentCalled);
- Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
- RealtimeSegmentDataManager.State.DISCARDED);
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
RealtimeSegmentDataManager.State.DISCARDED);
segmentDataManager.destroy();
}
@@ -800,22 +797,13 @@ public class RealtimeSegmentDataManagerTest {
tableConfig.setUpsertConfig(null);
ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
when(propertyStore.get(anyString(), any(),
anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+ HelixManager helixManager = mock(HelixManager.class);
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
- TableDataManagerConfig tableDataManagerConfig =
mock(TableDataManagerConfig.class);
-
when(tableDataManagerConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
- when(tableDataManagerConfig.getTableType()).thenReturn(TableType.REALTIME);
-
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
- when(tableDataManagerConfig.getTableConfig()).thenReturn(tableConfig);
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
-
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
- TableDataManagerProvider.init(instanceDataManagerConfig);
-
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManager tableDataManager =
- TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
"testInstance", propertyStore,
- mock(ServerMetrics.class), mock(HelixManager.class), null);
+ new
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(tableConfig,
helixManager);
tableDataManager.start();
tableDataManager.shutDown();
Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
@@ -838,8 +826,7 @@ public class RealtimeSegmentDataManagerTest {
}
};
FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(true, timeSupplier,
- String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS * 2),
- segmentTimeThresholdMins + "m", null);
+ String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS *
2), segmentTimeThresholdMins + "m", null);
segmentDataManager._stubConsumeLoop = false;
segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
@@ -1081,7 +1068,7 @@ public class RealtimeSegmentDataManagerTest {
if (!forCommit) {
return new SegmentBuildDescriptor(null, null, getCurrentOffset(), 0,
0, -1);
}
- File segmentTarFile = new File(SEGMENT_DIR, "segmentFile");
+ File segmentTarFile = new File(new File(TEMP_DIR, REALTIME_TABLE_NAME),
"segmentFile");
try {
segmentTarFile.createNewFile();
} catch (IOException e) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index 211d2d08ad..bcf75a2cd0 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -34,27 +34,26 @@ import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.core.data.manager.TableDataManagerTestUtils;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -77,15 +76,20 @@ public class RealtimeTableDataManagerTest {
private static final String LONG_COLUMN = "col2";
private static final long[] LONG_VALUES = {10000L, 20000L, 50000L, 40000L,
30000L};
+ @BeforeClass
+ public void setUp() {
+ ServerMetrics.register(mock(ServerMetrics.class));
+ }
+
@BeforeMethod
- public void setUp()
+ public void setUpMethod()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
TableDataManagerTestUtils.initSegmentFetcher();
}
@AfterMethod
- public void tearDown()
+ public void tearDownMethod()
throws Exception {
FileUtils.deleteDirectory(TEMP_DIR);
}
@@ -93,14 +97,14 @@ public class RealtimeTableDataManagerTest {
@Test
public void testAddSegmentUseBackupCopy()
throws Exception {
- RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
- TableDataManagerConfig tableDataManagerConfig =
createTableDataManagerConfig();
- ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+ InstanceDataManagerConfig instanceDataManagerConfig =
createInstanceDataManagerConfig();
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
TableConfig tableConfig = setupTableConfig(propertyStore);
Schema schema = setupSchema(propertyStore);
- tmgr.init(tableDataManagerConfig, "server01", propertyStore,
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
- new TableDataManagerParams(0, false, -1));
+ HelixManager helixManager = mock(HelixManager.class);
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
+ tmgr.init(instanceDataManagerConfig, tableConfig, helixManager, null,
null);
// Create a dummy local segment.
String segName = "seg01";
@@ -128,14 +132,14 @@ public class RealtimeTableDataManagerTest {
@Test
public void testAddSegmentNoBackupCopy()
throws Exception {
- RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
- TableDataManagerConfig tableDataManagerConfig =
createTableDataManagerConfig();
- ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+ InstanceDataManagerConfig instanceDataManagerConfig =
createInstanceDataManagerConfig();
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
TableConfig tableConfig = setupTableConfig(propertyStore);
Schema schema = setupSchema(propertyStore);
- tmgr.init(tableDataManagerConfig, "server01", propertyStore,
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
- new TableDataManagerParams(0, false, -1));
+ HelixManager helixManager = mock(HelixManager.class);
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ RealtimeTableDataManager tmgr = new RealtimeTableDataManager(null);
+ tmgr.init(instanceDataManagerConfig, tableConfig, helixManager, null,
null);
// Create a raw segment and put it in deep store backed by local fs.
String segName = "seg01";
@@ -159,14 +163,14 @@ public class RealtimeTableDataManagerTest {
@Test
public void testAddSegmentDefaultTierByTierBasedDirLoader()
throws Exception {
- RealtimeTableDataManager tmgr1 = new RealtimeTableDataManager(null);
- TableDataManagerConfig tableDataManagerConfig =
createTableDataManagerConfig();
- ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+ InstanceDataManagerConfig instanceDataManagerConfig =
createInstanceDataManagerConfig();
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
TableConfig tableConfig = setupTableConfig(propertyStore);
Schema schema = setupSchema(propertyStore);
- tmgr1.init(tableDataManagerConfig, "server01", propertyStore,
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
- new TableDataManagerParams(0, false, -1));
+ HelixManager helixManager = mock(HelixManager.class);
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ RealtimeTableDataManager tmgr1 = new RealtimeTableDataManager(null);
+ tmgr1.init(instanceDataManagerConfig, tableConfig, helixManager, null,
null);
// Create a raw segment and put it in deep store backed by local fs.
String segName = "seg_tiered_01";
@@ -190,17 +194,10 @@ public class RealtimeTableDataManagerTest {
// Now, repeat initialization of the table data manager
tmgr1.shutDown();
RealtimeTableDataManager tmgr2 = new RealtimeTableDataManager(null);
- tableDataManagerConfig = createTableDataManagerConfig();
- propertyStore = mock(ZkHelixPropertyStore.class);
- tableConfig = setupTableConfig(propertyStore);
- schema = setupSchema(propertyStore);
- tmgr2.init(tableDataManagerConfig, "server01", propertyStore,
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
- new TableDataManagerParams(0, false, -1));
+ tmgr2.init(instanceDataManagerConfig, tableConfig, helixManager, null,
null);
// Reinitialize index loading config and try adding the segment
- indexLoadingConfig =
- TableDataManagerTestUtils.createIndexLoadingConfig("tierBased",
tableConfig, schema);
+ indexLoadingConfig =
TableDataManagerTestUtils.createIndexLoadingConfig("tierBased", tableConfig,
schema);
tmgr2.addSegment(segName, indexLoadingConfig, segmentZKMetadata);
// Make sure that the segment hasn't been moved
@@ -245,24 +242,22 @@ public class RealtimeTableDataManagerTest {
return new File(TABLE_DATA_DIR, segName);
}
- private static TableDataManagerConfig createTableDataManagerConfig() {
- TableDataManagerConfig tableDataManagerConfig =
mock(TableDataManagerConfig.class);
-
when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME_WITH_TYPE);
-
when(tableDataManagerConfig.getDataDir()).thenReturn(TABLE_DATA_DIR.getAbsolutePath());
- return tableDataManagerConfig;
+ private static InstanceDataManagerConfig createInstanceDataManagerConfig() {
+ InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
+ return instanceDataManagerConfig;
}
- private static TableConfig setupTableConfig(ZkHelixPropertyStore
propertyStore)
+ private static TableConfig setupTableConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore)
throws Exception {
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build();
ZNRecord tableConfigZNRecord = TableConfigUtils.toZNRecord(tableConfig);
when(propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForResourceConfig(TABLE_NAME_WITH_TYPE),
null,
AccessOption.PERSISTENT)).thenReturn(tableConfigZNRecord);
return tableConfig;
}
- private static Schema setupSchema(ZkHelixPropertyStore propertyStore) {
+ private static Schema setupSchema(ZkHelixPropertyStore<ZNRecord>
propertyStore) {
Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
.addMetric(LONG_COLUMN, FieldSpec.DataType.LONG).build();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index 27d4893403..49697b6b85 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -30,7 +30,6 @@ import
org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
@@ -39,7 +38,6 @@ import
org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -55,7 +53,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -86,12 +83,13 @@ public class QueryExecutorExceptionsTest {
private final List<ImmutableSegment> _indexSegments = new
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
private final List<String> _segmentNames = new
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
- private ServerMetrics _serverMetrics;
private QueryExecutor _queryExecutor;
@BeforeClass
public void setUp()
throws Exception {
+ ServerMetrics.register(mock(ServerMetrics.class));
+
// Set up the segments
FileUtils.deleteQuietly(INDEX_DIR);
assertTrue(INDEX_DIR.mkdirs());
@@ -132,21 +130,11 @@ public class QueryExecutorExceptionsTest {
}
// Mock the instance data manager
- _serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
- TableDataManagerConfig tableDataManagerConfig =
mock(TableDataManagerConfig.class);
- when(tableDataManagerConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
- when(tableDataManagerConfig.getTableType()).thenReturn(TableType.OFFLINE);
-
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
-
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
- TableDataManagerProvider.init(instanceDataManagerConfig);
- @SuppressWarnings("unchecked")
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
TableDataManager tableDataManager =
- TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
"testInstance",
- mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class),
mock(HelixManager.class), null);
+ new
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(tableConfig,
+ mock(HelixManager.class));
tableDataManager.start();
//we don't add index segments to the data manager to simulate
numSegmentsAcquired < numSegmentsQueried
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
@@ -158,7 +146,7 @@ public class QueryExecutorExceptionsTest {
PropertiesConfiguration queryExecutorConfig =
CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
_queryExecutor = new ServerQueryExecutorV1Impl();
- _queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
instanceDataManager, _serverMetrics);
+ _queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
instanceDataManager, ServerMetrics.get());
}
/**
@@ -190,6 +178,6 @@ public class QueryExecutorExceptionsTest {
}
private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
- return new ServerQueryRequest(instanceRequest, _serverMetrics,
System.currentTimeMillis());
+ return new ServerQueryRequest(instanceRequest, ServerMetrics.get(),
System.currentTimeMillis());
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index d68c6ab6f4..c600d8b5cc 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Executors;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -36,7 +35,6 @@ import
org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -52,7 +50,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -72,7 +69,7 @@ public class QueryExecutorTest {
private static final String AVRO_DATA_PATH = "data/simpleData200001.avro";
private static final String EMPTY_JSON_DATA_PATH =
"data/test_empty_data.json";
private static final String QUERY_EXECUTOR_CONFIG_PATH =
"conf/query-executor.properties";
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"QueryExecutorTest");
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"QueryExecutorTest");
private static final String RAW_TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final int NUM_SEGMENTS_TO_GENERATE = 2;
@@ -82,25 +79,27 @@ public class QueryExecutorTest {
private final List<ImmutableSegment> _indexSegments = new
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
private final List<String> _segmentNames = new
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
- private ServerMetrics _serverMetrics;
private QueryExecutor _queryExecutor;
@BeforeClass
public void setUp()
throws Exception {
+ ServerMetrics.register(mock(ServerMetrics.class));
+
// Set up the segments
- FileUtils.deleteQuietly(INDEX_DIR);
- assertTrue(INDEX_DIR.mkdirs());
+ FileUtils.deleteQuietly(TEMP_DIR);
+ assertTrue(TEMP_DIR.mkdirs());
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
Assert.assertNotNull(resourceUrl);
File avroFile = new File(resourceUrl.getFile());
Schema schema =
SegmentTestUtils.extractSchemaFromAvroWithoutTime(avroFile);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);
int i = 0;
for (; i < NUM_SEGMENTS_TO_GENERATE; i++) {
SegmentGeneratorConfig config =
- SegmentTestUtils.getSegmentGeneratorConfig(avroFile,
FileFormat.AVRO, INDEX_DIR, RAW_TABLE_NAME, tableConfig,
- schema);
+ SegmentTestUtils.getSegmentGeneratorConfig(avroFile,
FileFormat.AVRO, tableDataDir, RAW_TABLE_NAME,
+ tableConfig, schema);
config.setSegmentNamePostfix(Integer.toString(i));
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
@@ -110,7 +109,7 @@ public class QueryExecutorTest {
Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
- _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR,
driver.getSegmentName()), ReadMode.mmap));
+ _indexSegments.add(ImmutableSegmentLoader.load(new File(tableDataDir,
driver.getSegmentName()), ReadMode.mmap));
_segmentNames.add(driver.getSegmentName());
}
resourceUrl =
getClass().getClassLoader().getResource(EMPTY_JSON_DATA_PATH);
@@ -118,32 +117,22 @@ public class QueryExecutorTest {
File jsonFile = new File(resourceUrl.getFile());
for (; i < NUM_SEGMENTS_TO_GENERATE + NUM_EMPTY_SEGMENTS_TO_GENERATE; i++)
{
SegmentGeneratorConfig config =
- SegmentTestUtils.getSegmentGeneratorConfig(jsonFile,
FileFormat.JSON, INDEX_DIR, RAW_TABLE_NAME, tableConfig,
- schema);
+ SegmentTestUtils.getSegmentGeneratorConfig(jsonFile,
FileFormat.JSON, tableDataDir, RAW_TABLE_NAME,
+ tableConfig, schema);
config.setSegmentNamePostfix(Integer.toString(i));
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
driver.build();
- _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR,
driver.getSegmentName()), ReadMode.mmap));
+ _indexSegments.add(ImmutableSegmentLoader.load(new File(tableDataDir,
driver.getSegmentName()), ReadMode.mmap));
_segmentNames.add(driver.getSegmentName());
}
// Mock the instance data manager
- _serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
- TableDataManagerConfig tableDataManagerConfig =
mock(TableDataManagerConfig.class);
- when(tableDataManagerConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
- when(tableDataManagerConfig.getTableType()).thenReturn(TableType.OFFLINE);
-
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
-
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
- TableDataManagerProvider.init(instanceDataManagerConfig);
- @SuppressWarnings("unchecked")
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManager tableDataManager =
- TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
"testInstance",
- mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class),
mock(HelixManager.class), null);
+ new
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(tableConfig,
+ mock(HelixManager.class));
tableDataManager.start();
for (ImmutableSegment indexSegment : _indexSegments) {
tableDataManager.addSegment(indexSegment);
@@ -154,10 +143,9 @@ public class QueryExecutorTest {
// Set up the query executor
resourceUrl =
getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
Assert.assertNotNull(resourceUrl);
- PropertiesConfiguration queryExecutorConfig =
- CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
+ PropertiesConfiguration queryExecutorConfig =
CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
_queryExecutor = new ServerQueryExecutorV1Impl();
- _queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
instanceDataManager, _serverMetrics);
+ _queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
instanceDataManager, ServerMetrics.get());
}
@Test
@@ -205,10 +193,10 @@ public class QueryExecutorTest {
for (IndexSegment segment : _indexSegments) {
segment.destroy();
}
- FileUtils.deleteQuietly(INDEX_DIR);
+ FileUtils.deleteQuietly(TEMP_DIR);
}
private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
- return new ServerQueryRequest(instanceRequest, _serverMetrics,
System.currentTimeMillis());
+ return new ServerQueryRequest(instanceRequest, ServerMetrics.get(),
System.currentTimeMillis());
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index a5c2cc3f5c..17c543c0f7 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -52,7 +51,6 @@ import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -69,7 +67,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -85,7 +82,7 @@ import static org.mockito.Mockito.when;
public class ExplainPlanQueriesTest extends BaseQueriesTest {
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"ExplainPlanQueriesTest");
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"ExplainPlanQueriesTest");
private static final String QUERY_EXECUTOR_CONFIG_PATH =
"conf/query-executor.properties";
private static final ExecutorService QUERY_RUNNERS =
Executors.newFixedThreadPool(20);
@@ -144,7 +141,6 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest
{
private List<IndexSegment> _indexSegments;
private List<String> _segmentNames;
- private ServerMetrics _serverMetrics;
private QueryExecutor _queryExecutor;
private QueryExecutor _queryExecutorWithPrefetchEnabled;
private BrokerReduceService _brokerReduceService;
@@ -214,9 +210,10 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
List<String> textIndexColumns = Arrays.asList(COL1_TEXT_INDEX);
+ File tableDataDir = new File(TEMP_DIR, OFFLINE_TABLE_NAME);
SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
segmentGeneratorConfig.setSegmentName(segmentName);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+ segmentGeneratorConfig.setOutDir(tableDataDir.getPath());
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
@@ -232,13 +229,15 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
_segmentNames.add(segmentName);
- return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
indexLoadingConfig);
+ return ImmutableSegmentLoader.load(new File(tableDataDir, segmentName),
indexLoadingConfig);
}
@BeforeClass
public void setUp()
throws Exception {
- FileUtils.deleteDirectory(INDEX_DIR);
+ ServerMetrics.register(mock(ServerMetrics.class));
+
+ FileUtils.deleteDirectory(TEMP_DIR);
_segmentNames = new ArrayList<>();
List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
@@ -290,21 +289,11 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
_indexSegments = Arrays.asList(immutableSegment1, immutableSegment2,
immutableSegment3, immutableSegment4);
// Mock the instance data manager
- _serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
- TableDataManagerConfig tableDataManagerConfig =
mock(TableDataManagerConfig.class);
- when(tableDataManagerConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
- when(tableDataManagerConfig.getTableType()).thenReturn(TableType.OFFLINE);
-
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
-
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
- TableDataManagerProvider.init(instanceDataManagerConfig);
- @SuppressWarnings("unchecked")
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManager tableDataManager =
- TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
"testInstance",
- mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class),
mock(HelixManager.class), null);
+ new
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(TABLE_CONFIG,
+ mock(HelixManager.class));
tableDataManager.start();
for (IndexSegment indexSegment : _indexSegments) {
tableDataManager.addSegment((ImmutableSegment) indexSegment);
@@ -318,12 +307,12 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
PropertiesConfiguration queryExecutorConfig =
CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
_queryExecutor = new ServerQueryExecutorV1Impl();
- _queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
instanceDataManager, _serverMetrics);
+ _queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
instanceDataManager, ServerMetrics.get());
PinotConfiguration prefetchEnabledConf = new
PinotConfiguration(queryExecutorConfig);
prefetchEnabledConf.setProperty(ServerQueryExecutorV1Impl.ENABLE_PREFETCH,
"true");
_queryExecutorWithPrefetchEnabled = new ServerQueryExecutorV1Impl();
- _queryExecutorWithPrefetchEnabled.init(prefetchEnabledConf,
instanceDataManager, _serverMetrics);
+ _queryExecutorWithPrefetchEnabled.init(prefetchEnabledConf,
instanceDataManager, ServerMetrics.get());
// Create the BrokerReduceService
_brokerReduceService = new BrokerReduceService(new PinotConfiguration(
@@ -420,7 +409,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest
{
}
private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
- return new ServerQueryRequest(instanceRequest, _serverMetrics,
System.currentTimeMillis());
+ return new ServerQueryRequest(instanceRequest, ServerMetrics.get(),
System.currentTimeMillis());
}
@Test
@@ -2547,6 +2536,6 @@ public class ExplainPlanQueriesTest extends
BaseQueriesTest {
for (IndexSegment segment : _indexSegments) {
segment.destroy();
}
- FileUtils.deleteQuietly(INDEX_DIR);
+ FileUtils.deleteQuietly(TEMP_DIR);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index ddf46340be..ac9b7e82a9 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -31,10 +31,8 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.configuration2.PropertiesConfiguration;
-import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -45,7 +43,6 @@ import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -63,12 +60,10 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
-import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -85,11 +80,9 @@ import static org.testng.Assert.assertTrue;
* Class for testing segment generation with byte[] data type.
*/
public class SegmentWithNullValueVectorTest {
- private static final int NUM_ROWS = 10001;
-
- private static final String SEGMENT_DIR_NAME =
- System.getProperty("java.io.tmpdir") + File.separator +
"nullValueVectorTest";
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"SegmentWithNullValueVectorTest");
private static final String SEGMENT_NAME = "testSegment";
+ private static final int NUM_ROWS = 10001;
private static final long LONG_VALUE_THRESHOLD = 100;
private Random _random;
@@ -107,7 +100,6 @@ public class SegmentWithNullValueVectorTest {
// Required for subsequent queries
private final List<String> _segmentNames = new ArrayList<>();
private InstanceDataManager _instanceDataManager;
- private ServerMetrics _serverMetrics;
private QueryExecutor _queryExecutor;
private static final String RAW_TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
@@ -124,6 +116,7 @@ public class SegmentWithNullValueVectorTest {
@BeforeClass
public void setup()
throws Exception {
+ ServerMetrics.register(mock(ServerMetrics.class));
_schema = new Schema();
_schema.addField(new DimensionFieldSpec(INT_COLUMN,
FieldSpec.DataType.INT, true));
@@ -136,44 +129,28 @@ public class SegmentWithNullValueVectorTest {
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNullHandlingEnabled(true).build();
_random = new Random(System.nanoTime());
buildIndex(tableConfig, _schema);
- _segment = ImmutableSegmentLoader.load(new File(SEGMENT_DIR_NAME,
SEGMENT_NAME), ReadMode.heap);
- setupQueryServer();
- }
-
- // Registers the segment and initializes Query Executor
- private void setupQueryServer()
- throws ConfigurationException {
+ _segment =
+ ImmutableSegmentLoader.load(new File(new File(TEMP_DIR,
OFFLINE_TABLE_NAME), SEGMENT_NAME), ReadMode.heap);
_segmentNames.add(_segment.getSegmentName());
+
// Mock the instance data manager
- _serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
- TableDataManagerConfig tableDataManagerConfig =
Mockito.mock(TableDataManagerConfig.class);
-
Mockito.when(tableDataManagerConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
-
Mockito.when(tableDataManagerConfig.getTableType()).thenReturn(TableType.OFFLINE);
-
Mockito.when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
-
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
-
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
-
when(instanceDataManagerConfig.getMaxParallelSegmentDownloads()).thenReturn(-1);
-
when(instanceDataManagerConfig.isStreamSegmentDownloadUntar()).thenReturn(false);
- TableDataManagerProvider.init(instanceDataManagerConfig);
- @SuppressWarnings("unchecked")
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
TableDataManager tableDataManager =
- TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
"testInstance",
- Mockito.mock(ZkHelixPropertyStore.class),
Mockito.mock(ServerMetrics.class),
- Mockito.mock(HelixManager.class), null);
+ new
TableDataManagerProvider(instanceDataManagerConfig).getTableDataManager(tableConfig,
+ mock(HelixManager.class));
tableDataManager.start();
tableDataManager.addSegment(_segment);
- _instanceDataManager = Mockito.mock(InstanceDataManager.class);
-
Mockito.when(_instanceDataManager.getTableDataManager(OFFLINE_TABLE_NAME)).thenReturn(tableDataManager);
+ _instanceDataManager = mock(InstanceDataManager.class);
+
when(_instanceDataManager.getTableDataManager(OFFLINE_TABLE_NAME)).thenReturn(tableDataManager);
// Set up the query executor
URL resourceUrl =
getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
Assert.assertNotNull(resourceUrl);
- PropertiesConfiguration queryExecutorConfig =
- CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
+ PropertiesConfiguration queryExecutorConfig =
CommonsConfigurationUtils.fromFile(new File(resourceUrl.getFile()));
_queryExecutor = new ServerQueryExecutorV1Impl();
- _queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
_instanceDataManager, _serverMetrics);
+ _queryExecutor.init(new PinotConfiguration(queryExecutorConfig),
_instanceDataManager, ServerMetrics.get());
}
/**
@@ -186,9 +163,8 @@ public class SegmentWithNullValueVectorTest {
private void buildIndex(TableConfig tableConfig, Schema schema)
throws Exception {
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
-
- config.setOutDir(SEGMENT_DIR_NAME);
config.setSegmentName(SEGMENT_NAME);
+ config.setOutDir(new File(TEMP_DIR, OFFLINE_TABLE_NAME).getAbsolutePath());
List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
@@ -295,7 +271,7 @@ public class SegmentWithNullValueVectorTest {
}
private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
- return new ServerQueryRequest(instanceRequest, _serverMetrics,
System.currentTimeMillis());
+ return new ServerQueryRequest(instanceRequest, ServerMetrics.get(),
System.currentTimeMillis());
}
/**
@@ -305,6 +281,6 @@ public class SegmentWithNullValueVectorTest {
public void cleanup()
throws IOException {
_segment.destroy();
- FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME));
+ FileUtils.deleteQuietly(TEMP_DIR);
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 634390effc..43017f29bd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -389,7 +389,7 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
throws Exception {
PinotConfiguration config = getServerConf(12345);
config.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
- HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+ HelixInstanceDataManagerConfig.UPSERT_CONFIG_PREFIX,
TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_METADATA_MANAGER_CLASS),
DummyTableUpsertMetadataManager.class.getName());
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index f10a58bf72..9123c9660c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -106,10 +106,10 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX
+ ".max.segment.preload.threads",
"1");
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
- HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+ HelixInstanceDataManagerConfig.UPSERT_CONFIG_PREFIX,
TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_SNAPSHOT),
"true");
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
- HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+ HelixInstanceDataManagerConfig.UPSERT_CONFIG_PREFIX,
TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_PRELOAD),
"true");
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 0b820f50c2..0396b925ce 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -27,14 +27,12 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -48,11 +46,19 @@ public interface TableDataManager {
/**
* Initializes the table data manager. Should be called only once and before
calling any other method.
*/
- void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
- ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics
serverMetrics, HelixManager helixManager,
+ void init(InstanceDataManagerConfig instanceDataManagerConfig, TableConfig
tableConfig, HelixManager helixManager,
@Nullable ExecutorService segmentPreloadExecutor,
- @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo>
errorCache,
- TableDataManagerParams tableDataManagerParams);
+ @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo>
errorCache);
+
+ /**
+ * Returns the instance id of the server.
+ */
+ String getInstanceId();
+
+ /**
+ * Returns the config for the instance data manager.
+ */
+ InstanceDataManagerConfig getInstanceDataManagerConfig();
/**
* Starts the table data manager. Should be called only once after table
data manager gets initialized but before
@@ -207,11 +213,6 @@ public interface TableDataManager {
*/
File getTableDataDir();
- /**
- * Returns the config for the table data manager.
- */
- TableDataManagerConfig getTableDataManagerConfig();
-
/**
* Add error related to segment, if any. The implementation
* is expected to cache last 'N' errors for the table, related to
@@ -233,7 +234,7 @@ public interface TableDataManager {
* @param segmentNameStr name of segment for which the state change is being
handled
*/
default void onConsumingToDropped(String segmentNameStr) {
- };
+ }
/**
* Interface to handle segment state transitions from CONSUMING to ONLINE
@@ -241,5 +242,5 @@ public interface TableDataManager {
* @param segmentNameStr name of segment for which the state change is being
handled
*/
default void onConsumingToOnline(String segmentNameStr) {
- };
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
deleted file mode 100644
index b17c24e564..0000000000
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerConfig.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.data.manager;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.configuration2.Configuration;
-import org.apache.commons.configuration2.PropertiesConfiguration;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.env.PinotConfiguration;
-
-
-/**
- * The config used for TableDataManager.
- */
-public class TableDataManagerConfig {
- public static final String AUTH_CONFIG_PREFIX = "auth";
- public static final String TIER_CONFIGS_PREFIX = "tierConfigs";
- public static final String TIER_NAMES = "tierNames";
-
- private final InstanceDataManagerConfig _instanceDataManagerConfig;
- private final TableConfig _tableConfig;
-
- public TableDataManagerConfig(InstanceDataManagerConfig
instanceDataManagerConfig, TableConfig tableConfig) {
- _instanceDataManagerConfig = instanceDataManagerConfig;
- _tableConfig = tableConfig;
- }
-
- public InstanceDataManagerConfig getInstanceDataManagerConfig() {
- return _instanceDataManagerConfig;
- }
-
- public TableConfig getTableConfig() {
- return _tableConfig;
- }
-
- public String getTableName() {
- return _tableConfig.getTableName();
- }
-
- public TableType getTableType() {
- return _tableConfig.getTableType();
- }
-
- public boolean isDimTable() {
- return _tableConfig.isDimTable();
- }
-
- public String getDataDir() {
- return _instanceDataManagerConfig.getInstanceDataDir() + "/" +
getTableName();
- }
-
- public String getConsumerDir() {
- return _instanceDataManagerConfig.getConsumerDir();
- }
-
- public String getTablePeerDownloadScheme() {
- String peerSegmentDownloadScheme =
_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
- if (peerSegmentDownloadScheme != null) {
- return peerSegmentDownloadScheme;
- }
- return _instanceDataManagerConfig.getSegmentPeerDownloadScheme();
- }
-
- public int getTableDeletedSegmentsCacheSize() {
- return _instanceDataManagerConfig.getDeletedSegmentsCacheSize();
- }
-
- public int getTableDeletedSegmentsCacheTtlMinutes() {
- return _instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes();
- }
-
- public Configuration getAuthConfig() {
- Configuration authConfig = new PropertiesConfiguration();
-
_instanceDataManagerConfig.getConfig().subset(AUTH_CONFIG_PREFIX).toMap().forEach(authConfig::addProperty);
- return authConfig;
- }
-
- public Map<String, Map<String, String>> getInstanceTierConfigs() {
- PinotConfiguration tierConfigs =
_instanceDataManagerConfig.getConfig().subset(TIER_CONFIGS_PREFIX);
- List<String> tierNames = tierConfigs.getProperty(TIER_NAMES,
Collections.emptyList());
- if (tierNames.isEmpty()) {
- return Collections.emptyMap();
- }
- Map<String, Map<String, String>> instanceTierConfigs = new HashMap<>();
- for (String tierName : tierNames) {
- Map<String, String> tierConfigMap = new HashMap<>();
- tierConfigs.subset(tierName).toMap().forEach((k, v) ->
tierConfigMap.put(k, String.valueOf(v)));
- instanceTierConfigs.put(tierName, tierConfigMap);
- }
- return instanceTierConfigs;
- }
-}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
deleted file mode 100644
index f3a0589dd2..0000000000
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.data.manager;
-
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
-
-
-public class TableDataManagerParams {
- private boolean _isStreamSegmentDownloadUntar; // whether to turn on stream
segment download-untar
- private long _streamSegmentDownloadUntarRateLimitBytesPerSec; // the per
segment rate limit for stream download-untar
- private int _maxParallelSegmentDownloads; // max number of segment download
in parallel per table
-
- public TableDataManagerParams(int maxParallelSegmentDownloads, boolean
isStreamSegmentDownloadUntar,
- long streamSegmentDownloadUntarRateLimitBytesPerSec) {
- _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
- _isStreamSegmentDownloadUntar = isStreamSegmentDownloadUntar;
- _streamSegmentDownloadUntarRateLimitBytesPerSec =
streamSegmentDownloadUntarRateLimitBytesPerSec;
- }
-
- public TableDataManagerParams(InstanceDataManagerConfig
instanceDataManagerConfig) {
- _maxParallelSegmentDownloads =
instanceDataManagerConfig.getMaxParallelSegmentDownloads();
- _isStreamSegmentDownloadUntar =
instanceDataManagerConfig.isStreamSegmentDownloadUntar();
- _streamSegmentDownloadUntarRateLimitBytesPerSec =
- instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
- }
-
- public boolean isStreamSegmentDownloadUntar() {
- return _isStreamSegmentDownloadUntar;
- }
-
- public long getStreamSegmentDownloadUntarRateLimitBytesPerSec() {
- return _streamSegmentDownloadUntarRateLimitBytesPerSec;
- }
-
- public void setStreamSegmentDownloadUntar(boolean
streamSegmentDownloadUntar) {
- _isStreamSegmentDownloadUntar = streamSegmentDownloadUntar;
- }
-
- public void setStreamSegmentDownloadUntarRateLimitBytesPerSec(long
streamSegmentDownloadUntarRateLimitBytesPerSec) {
- _streamSegmentDownloadUntarRateLimitBytesPerSec =
streamSegmentDownloadUntarRateLimitBytesPerSec;
- }
-
- public int getMaxParallelSegmentDownloads() {
- return _maxParallelSegmentDownloads;
- }
-
- public void setMaxParallelSegmentDownloads(int maxParallelSegmentDownloads) {
- _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
- }
-}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 72f57d4086..71c8c37317 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -41,7 +41,6 @@ import
org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
-import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -214,15 +213,13 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
@VisibleForTesting
String getInstanceId() {
- InstanceDataManagerConfig instanceDataManagerConfig =
-
_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig();
- return instanceDataManagerConfig.getInstanceId();
+ return _tableDataManager.getInstanceDataManagerConfig().getInstanceId();
}
@VisibleForTesting
IndexLoadingConfig createIndexLoadingConfig() {
- return new
IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(),
- _context.getTableConfig(), _context.getSchema());
+ return new
IndexLoadingConfig(_tableDataManager.getInstanceDataManagerConfig(),
_context.getTableConfig(),
+ _context.getSchema());
}
@VisibleForTesting
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 1ee4a834e1..f3a3dff5ab 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -38,7 +38,7 @@ public class TableUpsertMetadataManagerFactory {
public static final String UPSERT_DEFAULT_ENABLE_PRELOAD =
"default.enable.preload";
public static TableUpsertMetadataManager create(TableConfig tableConfig,
- @Nullable PinotConfiguration instanceUpsertConfigs) {
+ @Nullable PinotConfiguration instanceUpsertConfig) {
String tableNameWithType = tableConfig.getTableName();
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
Preconditions.checkArgument(upsertConfig != null, "Must provide upsert
config for table: %s", tableNameWithType);
@@ -46,20 +46,20 @@ public class TableUpsertMetadataManagerFactory {
TableUpsertMetadataManager metadataManager;
String metadataManagerClass = upsertConfig.getMetadataManagerClass();
- if (instanceUpsertConfigs != null) {
+ if (instanceUpsertConfig != null) {
if (metadataManagerClass == null) {
- metadataManagerClass =
instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_METADATA_MANAGER_CLASS);
+ metadataManagerClass =
instanceUpsertConfig.getProperty(UPSERT_DEFAULT_METADATA_MANAGER_CLASS);
}
// Server level config honoured only when table level config is not set
to true
if (!upsertConfig.isEnableSnapshot()) {
upsertConfig.setEnableSnapshot(
-
Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_SNAPSHOT,
"false")));
+
Boolean.parseBoolean(instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ENABLE_SNAPSHOT,
"false")));
}
// Server level config honoured only when table level config is not set
to true
if (!upsertConfig.isEnablePreload()) {
upsertConfig.setEnablePreload(
-
Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD,
"false")));
+
Boolean.parseBoolean(instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD,
"false")));
}
}
@@ -67,8 +67,7 @@ public class TableUpsertMetadataManagerFactory {
LOGGER.info("Creating TableUpsertMetadataManager with class: {} for
table: {}", metadataManagerClass,
tableNameWithType);
try {
- metadataManager =
- (TableUpsertMetadataManager)
Class.forName(metadataManagerClass).newInstance();
+ metadataManager = (TableUpsertMetadataManager)
Class.forName(metadataManagerClass).newInstance();
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while constructing
TableUpsertMetadataManager with class: %s for table: %s",
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 0d194c51ae..8e147cc74a 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -60,7 +60,6 @@ import
org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -87,8 +86,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private HelixInstanceDataManagerConfig _instanceDataManagerConfig;
private String _instanceId;
+ private TableDataManagerProvider _tableDataManagerProvider;
private HelixManager _helixManager;
- private ServerMetrics _serverMetrics;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private SegmentUploader _segmentUploader;
private Supplier<Boolean> _isServerReadyToServeQueries = () -> false;
@@ -114,10 +113,10 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
_instanceDataManagerConfig = new HelixInstanceDataManagerConfig(config);
LOGGER.info("HelixInstanceDataManagerConfig: {}",
_instanceDataManagerConfig);
_instanceId = _instanceDataManagerConfig.getInstanceId();
+ _tableDataManagerProvider = new
TableDataManagerProvider(_instanceDataManagerConfig);
_helixManager = helixManager;
- _serverMetrics = serverMetrics;
_segmentUploader = new
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
-
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
_serverMetrics);
+
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(),
serverMetrics);
_externalViewDroppedMaxWaitMs =
_instanceDataManagerConfig.getExternalViewDroppedMaxWaitMs();
_externalViewDroppedCheckInternalMs =
_instanceDataManagerConfig.getExternalViewDroppedCheckIntervalMs();
@@ -148,8 +147,6 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
} else {
LOGGER.info("SegmentPreloadExecutor was not created with pool size: {}",
poolSize);
}
- // Initialize the table data manager provider
- TableDataManagerProvider.init(_instanceDataManagerConfig);
LOGGER.info("Initialized Helix instance data manager");
// Initialize the error cache
@@ -225,17 +222,17 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
realtimeTableName, segmentName);
Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata
for segment: %s, table: %s", segmentName,
realtimeTableName);
- _tableDataManagerMap.computeIfAbsent(realtimeTableName, k ->
createTableDataManager(k, tableConfig))
+ _tableDataManagerMap.computeIfAbsent(realtimeTableName, k ->
createTableDataManager(tableConfig))
.addSegment(segmentName, new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema),
zkMetadata);
LOGGER.info("Added segment: {} to table: {}", segmentName,
realtimeTableName);
}
- private TableDataManager createTableDataManager(String tableNameWithType,
TableConfig tableConfig) {
+ private TableDataManager createTableDataManager(TableConfig tableConfig) {
+ String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Creating table data manager for table: {}",
tableNameWithType);
- TableDataManagerConfig tableDataManagerConfig = new
TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
TableDataManager tableDataManager =
- TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
_instanceId, _propertyStore,
- _serverMetrics, _helixManager, _segmentPreloadExecutor,
_errorCache, _isServerReadyToServeQueries);
+ _tableDataManagerProvider.getTableDataManager(tableConfig,
_helixManager, _segmentPreloadExecutor, _errorCache,
+ _isServerReadyToServeQueries);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
@@ -501,7 +498,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
// is set to null. Then, addOrReplaceSegment method will load the
segment accordingly.
SegmentMetadata localMetadata = getSegmentMetadata(tableNameWithType,
segmentName);
- _tableDataManagerMap.computeIfAbsent(tableNameWithType, k ->
createTableDataManager(k, tableConfig))
+ _tableDataManagerMap.computeIfAbsent(tableNameWithType, k ->
createTableDataManager(tableConfig))
.addOrReplaceSegment(segmentName, new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema),
zkMetadata, localMetadata);
LOGGER.info("Added or replaced segment: {} of table: {}", segmentName,
tableNameWithType);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index c7e15fe106..1b5940ff41 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -18,6 +18,11 @@
*/
package org.apache.pinot.server.starter.helix;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -37,8 +42,6 @@ import static
org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_READ_MOD
/**
* The config used for HelixInstanceDataManager.
- *
- *
*/
public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixInstanceDataManagerConfig.class);
@@ -56,10 +59,6 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir";
// Key of segment directory
public static final String INSTANCE_BOOTSTRAP_SEGMENT_DIR =
"bootstrap.segment.dir";
- // Key of table data directory
- public static final String kEY_OF_TABLE_DATA_DIRECTORY = "directory";
- // Key of table data directory
- public static final String kEY_OF_TABLE_NAME = "name";
// Key of instance level segment read mode
public static final String READ_MODE = "readMode";
// Key of the segment format this server can read
@@ -68,6 +67,14 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT =
"reload.consumingSegment";
// Key of segment directory loader
public static final String SEGMENT_DIRECTORY_LOADER =
"segment.directory.loader";
+ // Prefix for upsert config
+ public static final String UPSERT_CONFIG_PREFIX = "upsert";
+ // Prefix for auth config
+ public static final String AUTH_CONFIG_PREFIX = "auth";
+ // Prefix for tier configs
+ public static final String TIER_CONFIGS_PREFIX = "tierConfigs";
+ // Key of tier names
+ public static final String TIER_NAMES = "tierNames";
// Key of how many parallel realtime segments can be built.
// A value of <= 0 indicates unlimited.
@@ -86,10 +93,10 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
// Key of server segment download rate limit
// limit the rate to write download-untar stream to disk, in bytes
// -1 for no disk write limit, 0 for limit the writing to min(untar,
download) rate
- private static final String STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT
- = "segment.stream.download.untar.rate.limit.bytes.per.sec";
- private static final long DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT
- = TarGzCompressionUtils.NO_DISK_WRITE_RATE_LIMIT;
+ private static final String STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT =
+ "segment.stream.download.untar.rate.limit.bytes.per.sec";
+ private static final long DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT =
+ TarGzCompressionUtils.NO_DISK_WRITE_RATE_LIMIT;
// Key of whether to use streamed server segment download-untar
private static final String ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR =
"segment.stream.download.untar";
@@ -131,8 +138,6 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS =
"external.view.dropped.max.wait.ms";
private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS =
"external.view.dropped.check.interval.ms";
- public static final String PREFIX_OF_CONFIG_OF_UPSERT = "upsert";
-
private final static String[] REQUIRED_KEYS = {INSTANCE_ID};
private static final long DEFAULT_ERROR_CACHE_SIZE = 100L;
private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000;
@@ -140,176 +145,187 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
public static final long DEFAULT_EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 20 *
60_000L;
public static final long DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS =
1_000L;
- private final PinotConfiguration _instanceDataManagerConfiguration;
+ private final PinotConfiguration _serverConfig;
+ private final PinotConfiguration _upsertConfig;
+ private final PinotConfiguration _authConfig;
+ private final Map<String, Map<String, String>> _tierConfigs;
public HelixInstanceDataManagerConfig(PinotConfiguration serverConfig)
throws ConfigurationException {
- _instanceDataManagerConfiguration = serverConfig;
+ _serverConfig = serverConfig;
for (String key : serverConfig.getKeys()) {
LOGGER.info("InstanceDataManagerConfig, key: {} , value: {}", key,
serverConfig.getProperty(key));
}
checkRequiredKeys();
+
+ _authConfig = serverConfig.subset(AUTH_CONFIG_PREFIX);
+ _upsertConfig = serverConfig.subset(UPSERT_CONFIG_PREFIX);
+
+ PinotConfiguration tierConfigs = getConfig().subset(TIER_CONFIGS_PREFIX);
+ List<String> tierNames = tierConfigs.getProperty(TIER_NAMES,
Collections.emptyList());
+ if (tierNames.isEmpty()) {
+ _tierConfigs = Collections.emptyMap();
+ } else {
+ _tierConfigs = Maps.newHashMapWithExpectedSize(tierNames.size());
+ for (String tierName : tierNames) {
+ Map<String, String> tierConfigMap = new HashMap<>();
+ tierConfigs.subset(tierName).toMap().forEach((k, v) ->
tierConfigMap.put(k, String.valueOf(v)));
+ _tierConfigs.put(tierName, tierConfigMap);
+ }
+ }
}
private void checkRequiredKeys()
throws ConfigurationException {
for (String keyString : REQUIRED_KEYS) {
-
Optional.ofNullable(_instanceDataManagerConfiguration.getProperty(keyString))
+ Optional.ofNullable(_serverConfig.getProperty(keyString))
.orElseThrow(() -> new ConfigurationException("Cannot find required
key : " + keyString));
}
}
@Override
public PinotConfiguration getConfig() {
- return _instanceDataManagerConfiguration;
+ return _serverConfig;
}
@Override
public String getInstanceId() {
- return _instanceDataManagerConfiguration.getProperty(INSTANCE_ID);
+ return _serverConfig.getProperty(INSTANCE_ID);
}
@Override
public String getInstanceDataDir() {
- return _instanceDataManagerConfiguration.getProperty(INSTANCE_DATA_DIR,
DEFAULT_INSTANCE_DATA_DIR);
+ return _serverConfig.getProperty(INSTANCE_DATA_DIR,
DEFAULT_INSTANCE_DATA_DIR);
}
@Override
public String getConsumerDir() {
- return _instanceDataManagerConfiguration.getProperty(CONSUMER_DIR);
+ return _serverConfig.getProperty(CONSUMER_DIR);
}
@Override
public String getInstanceSegmentTarDir() {
- return
_instanceDataManagerConfiguration.getProperty(INSTANCE_SEGMENT_TAR_DIR,
DEFAULT_INSTANCE_SEGMENT_TAR_DIR);
+ return _serverConfig.getProperty(INSTANCE_SEGMENT_TAR_DIR,
DEFAULT_INSTANCE_SEGMENT_TAR_DIR);
}
@Override
public String getInstanceBootstrapSegmentDir() {
- return
_instanceDataManagerConfiguration.getProperty(INSTANCE_BOOTSTRAP_SEGMENT_DIR);
+ return _serverConfig.getProperty(INSTANCE_BOOTSTRAP_SEGMENT_DIR);
}
@Override
public String getSegmentStoreUri() {
- return
_instanceDataManagerConfiguration.getProperty(CONFIG_OF_SEGMENT_STORE_URI);
+ return _serverConfig.getProperty(CONFIG_OF_SEGMENT_STORE_URI);
}
@Override
public ReadMode getReadMode() {
- return
ReadMode.valueOf(_instanceDataManagerConfiguration.getProperty(READ_MODE,
DEFAULT_READ_MODE));
+ return ReadMode.valueOf(_serverConfig.getProperty(READ_MODE,
DEFAULT_READ_MODE));
}
@Override
public String getSegmentFormatVersion() {
- return
_instanceDataManagerConfiguration.getProperty(SEGMENT_FORMAT_VERSION);
+ return _serverConfig.getProperty(SEGMENT_FORMAT_VERSION);
}
@Override
public boolean isRealtimeOffHeapAllocation() {
- return
_instanceDataManagerConfiguration.getProperty(REALTIME_OFFHEAP_ALLOCATION,
true);
+ return _serverConfig.getProperty(REALTIME_OFFHEAP_ALLOCATION, true);
}
@Override
public boolean isDirectRealtimeOffHeapAllocation() {
- return
_instanceDataManagerConfiguration.getProperty(DIRECT_REALTIME_OFFHEAP_ALLOCATION,
false);
+ return _serverConfig.getProperty(DIRECT_REALTIME_OFFHEAP_ALLOCATION,
false);
}
public boolean shouldReloadConsumingSegment() {
- return _instanceDataManagerConfiguration
- .getProperty(INSTANCE_RELOAD_CONSUMING_SEGMENT,
Server.DEFAULT_RELOAD_CONSUMING_SEGMENT);
+ return _serverConfig.getProperty(INSTANCE_RELOAD_CONSUMING_SEGMENT,
Server.DEFAULT_RELOAD_CONSUMING_SEGMENT);
}
@Override
public String getAvgMultiValueCount() {
- return _instanceDataManagerConfiguration.getProperty(AVERAGE_MV_COUNT);
+ return _serverConfig.getProperty(AVERAGE_MV_COUNT);
}
public int getMaxParallelRefreshThreads() {
- return
_instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_REFRESH_THREADS, 1);
+ return _serverConfig.getProperty(MAX_PARALLEL_REFRESH_THREADS, 1);
}
public int getMaxSegmentPreloadThreads() {
- return
_instanceDataManagerConfiguration.getProperty(MAX_SEGMENT_PRELOAD_THREADS, 0);
+ return _serverConfig.getProperty(MAX_SEGMENT_PRELOAD_THREADS, 0);
}
public int getMaxParallelSegmentBuilds() {
- return _instanceDataManagerConfiguration
- .getProperty(MAX_PARALLEL_SEGMENT_BUILDS,
DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS);
+ return _serverConfig.getProperty(MAX_PARALLEL_SEGMENT_BUILDS,
DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS);
}
@Override
public int getMaxParallelSegmentDownloads() {
- return
_instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_SEGMENT_DOWNLOADS,
- DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
+ return _serverConfig.getProperty(MAX_PARALLEL_SEGMENT_DOWNLOADS,
DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
}
public String getSegmentDirectoryLoader() {
- return
_instanceDataManagerConfiguration.getProperty(SEGMENT_DIRECTORY_LOADER,
+ return _serverConfig.getProperty(SEGMENT_DIRECTORY_LOADER,
SegmentDirectoryLoaderRegistry.DEFAULT_SEGMENT_DIRECTORY_LOADER_NAME);
}
@Override
public long getErrorCacheSize() {
- return _instanceDataManagerConfiguration.getProperty(ERROR_CACHE_SIZE,
DEFAULT_ERROR_CACHE_SIZE);
+ return _serverConfig.getProperty(ERROR_CACHE_SIZE,
DEFAULT_ERROR_CACHE_SIZE);
}
@Override
public boolean isStreamSegmentDownloadUntar() {
- return
_instanceDataManagerConfiguration.getProperty(ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR,
+ return _serverConfig.getProperty(ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR,
DEFAULT_ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR);
}
@Override
public long getStreamSegmentDownloadUntarRateLimit() {
- return
_instanceDataManagerConfiguration.getProperty(STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT,
+ return _serverConfig.getProperty(STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT,
DEFAULT_STREAM_SEGMENT_DOWNLOAD_UNTAR_RATE_LIMIT);
}
@Override
public int getDeletedSegmentsCacheSize() {
- return
_instanceDataManagerConfiguration.getProperty(DELETED_SEGMENTS_CACHE_SIZE,
- DEFAULT_DELETED_SEGMENTS_CACHE_SIZE);
+ return _serverConfig.getProperty(DELETED_SEGMENTS_CACHE_SIZE,
DEFAULT_DELETED_SEGMENTS_CACHE_SIZE);
}
@Override
public int getDeletedSegmentsCacheTtlMinutes() {
- return
_instanceDataManagerConfiguration.getProperty(DELETED_SEGMENTS_CACHE_TTL_MINUTES,
- DEFAULT_DELETED_SEGMENTS_CACHE_TTL_MINUTES);
+ return _serverConfig.getProperty(DELETED_SEGMENTS_CACHE_TTL_MINUTES,
DEFAULT_DELETED_SEGMENTS_CACHE_TTL_MINUTES);
}
@Override
public String getSegmentPeerDownloadScheme() {
- return _instanceDataManagerConfiguration.getProperty(PEER_DOWNLOAD_SCHEME);
+ return _serverConfig.getProperty(PEER_DOWNLOAD_SCHEME);
}
@Override
public long getExternalViewDroppedMaxWaitMs() {
- return
_instanceDataManagerConfiguration.getProperty(EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS,
- DEFAULT_EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS);
+ return _serverConfig.getProperty(EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS,
DEFAULT_EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS);
}
@Override
public long getExternalViewDroppedCheckIntervalMs() {
- return
_instanceDataManagerConfiguration.getProperty(EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS,
+ return _serverConfig.getProperty(EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS,
DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS);
}
@Override
- public PinotConfiguration getUpsertConfigs() {
- return
_instanceDataManagerConfiguration.subset(PREFIX_OF_CONFIG_OF_UPSERT);
+ public PinotConfiguration getUpsertConfig() {
+ return _upsertConfig;
+ }
+
+ @Override
+ public PinotConfiguration getAuthConfig() {
+ return _authConfig;
}
@Override
- public String toString() {
- String configString = "";
- configString += "Instance Id: " + getInstanceId();
- configString += "\n\tInstance Data Dir: " + getInstanceDataDir();
- configString += "\n\tInstance Segment Tar Dir: " +
getInstanceSegmentTarDir();
- configString += "\n\tBootstrap Segment Dir: " +
getInstanceBootstrapSegmentDir();
- configString += "\n\tRead Mode: " + getReadMode();
- configString += "\n\tSegment format version: " + getSegmentFormatVersion();
- return configString;
+ public Map<String, Map<String, String>> getTierConfigs() {
+ return _tierConfigs;
}
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 3723f7a259..e57371f72d 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -30,7 +30,6 @@ import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -40,8 +39,6 @@ import
org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.transport.HttpServerThreadPoolConfig;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -50,6 +47,9 @@ import
org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.server.access.AllowAllAccessFactory;
import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
@@ -69,16 +69,16 @@ import static org.mockito.Mockito.when;
public abstract class BaseResourceTest {
private static final String AVRO_DATA_PATH = "data/test_data-mv.avro";
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"BaseResourceTest");
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"BaseResourceTest");
protected static final String TABLE_NAME = "testTable";
protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS =
- new
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 1, 0,
System.currentTimeMillis())
- .getSegmentName();
+ new
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 1, 0,
+ System.currentTimeMillis()).getSegmentName();
protected static final String LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE =
- new
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 2, 0,
System.currentTimeMillis())
- .getSegmentName();
- protected static final String SEGMENT_DOWNLOAD_URL = StringUtil
- .join("/", "hdfs://root", TABLE_NAME,
LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS);
+ new
LLCSegmentName(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), 2, 0,
+ System.currentTimeMillis()).getSegmentName();
+ protected static final String SEGMENT_DOWNLOAD_URL =
+ StringUtil.join("/", "hdfs://root", TABLE_NAME,
LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS);
private final Map<String, TableDataManager> _tableDataManagerMap = new
HashMap<>();
protected final List<ImmutableSegment> _realtimeIndexSegments = new
ArrayList<>();
@@ -92,32 +92,34 @@ public abstract class BaseResourceTest {
@BeforeClass
public void setUp()
throws Exception {
- FileUtils.deleteQuietly(INDEX_DIR);
- Assert.assertTrue(INDEX_DIR.mkdirs());
+ ServerMetrics.register(mock(ServerMetrics.class));
+
+ FileUtils.deleteQuietly(TEMP_DIR);
+ Assert.assertTrue(TEMP_DIR.mkdirs());
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
Assert.assertNotNull(resourceUrl);
_avroFile = new File(resourceUrl.getFile());
// Mock the instance data manager
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
- when(instanceDataManager.getTableDataManager(anyString()))
- .thenAnswer(invocation ->
_tableDataManagerMap.get(invocation.getArguments()[0]));
+ when(instanceDataManager.getTableDataManager(anyString())).thenAnswer(
+ invocation -> _tableDataManagerMap.get(invocation.getArguments()[0]));
when(instanceDataManager.getAllTables()).thenReturn(_tableDataManagerMap.keySet());
// Mock the server instance
ServerInstance serverInstance = mock(ServerInstance.class);
when(serverInstance.getServerMetrics()).thenReturn(mock(ServerMetrics.class));
when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager);
- when(serverInstance.getInstanceDataManager().getSegmentFileDirectory())
- .thenReturn(FileUtils.getTempDirectoryPath());
+
when(serverInstance.getInstanceDataManager().getSegmentFileDirectory()).thenReturn(
+ FileUtils.getTempDirectoryPath());
when(serverInstance.getHelixManager()).thenReturn(mock(HelixManager.class));
// Mock the segment uploader
SegmentUploader segmentUploader = mock(SegmentUploader.class);
- when(segmentUploader.uploadSegment(any(File.class), eq(new
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS))))
- .thenReturn(new URI(SEGMENT_DOWNLOAD_URL));
- when(segmentUploader.uploadSegment(any(File.class), eq(new
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE))))
- .thenReturn(null);
+ when(segmentUploader.uploadSegment(any(File.class),
+ eq(new
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)))).thenReturn(new
URI(SEGMENT_DOWNLOAD_URL));
+ when(segmentUploader.uploadSegment(any(File.class),
+ eq(new
LLCSegmentName(LLC_SEGMENT_NAME_FOR_UPLOAD_FAILURE)))).thenReturn(null);
when(instanceDataManager.getSegmentUploader()).thenReturn(segmentUploader);
// Add the default tables and segments.
@@ -155,7 +157,7 @@ public abstract class BaseResourceTest {
for (ImmutableSegment immutableSegment : _offlineIndexSegments) {
immutableSegment.destroy();
}
- FileUtils.deleteQuietly(INDEX_DIR);
+ FileUtils.deleteQuietly(TEMP_DIR);
}
protected List<ImmutableSegment> setUpSegments(String tableNameWithType, int
numSegments,
@@ -163,8 +165,8 @@ public abstract class BaseResourceTest {
throws Exception {
List<ImmutableSegment> immutableSegments = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
- immutableSegments
- .add(setUpSegment(tableNameWithType, null,
Integer.toString(_realtimeIndexSegments.size()), segments));
+ immutableSegments.add(
+ setUpSegment(tableNameWithType, null,
Integer.toString(_realtimeIndexSegments.size()), segments));
}
return immutableSegments;
}
@@ -172,30 +174,31 @@ public abstract class BaseResourceTest {
protected ImmutableSegment setUpSegment(String tableNameWithType, String
segmentName, String segmentNamePostfix,
List<ImmutableSegment> segments)
throws Exception {
+ File tableDataDir = new File(TEMP_DIR, tableNameWithType);
SegmentGeneratorConfig config =
- SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile,
INDEX_DIR, tableNameWithType);
+ SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile,
tableDataDir, tableNameWithType);
config.setSegmentName(segmentName);
config.setSegmentNamePostfix(segmentNamePostfix);
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
driver.build();
ImmutableSegment immutableSegment =
- ImmutableSegmentLoader.load(new File(INDEX_DIR,
driver.getSegmentName()), ReadMode.mmap);
+ ImmutableSegmentLoader.load(new File(tableDataDir,
driver.getSegmentName()), ReadMode.mmap);
segments.add(immutableSegment);
_tableDataManagerMap.get(tableNameWithType).addSegment(immutableSegment);
return immutableSegment;
}
- @SuppressWarnings("unchecked")
protected void addTable(String tableNameWithType) {
- TableDataManagerConfig tableDataManagerConfig =
mock(TableDataManagerConfig.class);
- when(tableDataManagerConfig.getTableName()).thenReturn(tableNameWithType);
-
when(tableDataManagerConfig.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
+ InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
+
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
+ TableConfig tableConfig = mock(TableConfig.class);
+ when(tableConfig.getTableName()).thenReturn(tableNameWithType);
+
when(tableConfig.getValidationConfig()).thenReturn(mock(SegmentsValidationAndRetentionConfig.class));
// NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table
because RealtimeTableDataManager requires
// table config.
TableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager.init(tableDataManagerConfig, "testInstance",
mock(ZkHelixPropertyStore.class),
- mock(ServerMetrics.class), mock(HelixManager.class), null, null, new
TableDataManagerParams(0, false, -1));
+ tableDataManager.init(instanceDataManagerConfig, tableConfig,
mock(HelixManager.class), null, null);
tableDataManager.start();
_tableDataManagerMap.put(tableNameWithType, tableDataManager);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index 6d035890c9..4247f76269 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.spi.config.instance;
+import java.util.Map;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.ReadMode;
public interface InstanceDataManagerConfig {
+
PinotConfiguration getConfig();
String getInstanceId();
@@ -69,5 +71,9 @@ public interface InstanceDataManagerConfig {
long getExternalViewDroppedCheckIntervalMs();
- PinotConfiguration getUpsertConfigs();
+ PinotConfiguration getUpsertConfig();
+
+ PinotConfiguration getAuthConfig();
+
+ Map<String, Map<String, String>> getTierConfigs();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]