This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch set-max-parallel-segment-downloads-per-table in repository https://gitbox.apache.org/repos/asf/pinot.git
commit b915f2ef4103a4f6736bc7a16743fcd9b92d7e05 Author: Jack Li(Analytics Engineering) <[email protected]> AuthorDate: Thu May 12 16:22:07 2022 -0700 Set max number of parallel segment downloads per table in pinot-server --- .../pinot/core/data/manager/BaseTableDataManager.java | 17 ++++++++++++++++- .../data/manager/offline/TableDataManagerProvider.java | 5 ++++- .../manager/BaseTableDataManagerAcquireSegmentTest.java | 2 +- .../core/data/manager/BaseTableDataManagerTest.java | 2 +- .../manager/offline/DimensionTableDataManagerTest.java | 2 +- .../segment/local/data/manager/TableDataManager.java | 2 +- .../starter/helix/HelixInstanceDataManagerConfig.java | 13 +++++++++++++ .../org/apache/pinot/server/api/BaseResourceTest.java | 2 +- .../spi/config/instance/InstanceDataManagerConfig.java | 2 ++ 9 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 79d1741fb6..4fc8e13f76 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -72,6 +73,8 @@ public abstract class BaseTableDataManager implements TableDataManager { private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDataManager.class); protected final ConcurrentHashMap<String, SegmentDataManager> _segmentDataManagerMap = new ConcurrentHashMap<>(); + // Semaphore to restrict the maximum number of parallel segment downloads for a table. + private Semaphore _segmentDownloadSemaphore; protected TableDataManagerConfig _tableDataManagerConfig; protected String _instanceId; @@ -92,7 +95,7 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId, ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager, - @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) { + @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, int maxParallelSegmentDownloads) { LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName()); _tableDataManagerConfig = tableDataManagerConfig; @@ -119,6 +122,11 @@ public abstract class BaseTableDataManager implements TableDataManager { _resourceTmpDir); } _errorCache = errorCache; + if (maxParallelSegmentDownloads > 0) { + _segmentDownloadSemaphore = new Semaphore(maxParallelSegmentDownloads, true); + } else { + _segmentDownloadSemaphore = null; + } _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + getClass().getSimpleName()); doInit(); @@ -403,6 +411,9 @@ public abstract class BaseTableDataManager implements TableDataManager { File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); String uri = zkMetadata.getDownloadUrl(); try { + if (_segmentDownloadSemaphore != null) { + _segmentDownloadSemaphore.acquire(); + } SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName()); LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, _tableNameWithType, uri, tarFile, tarFile.length()); @@ -412,6 +423,10 @@ public abstract class BaseTableDataManager implements TableDataManager { _tableNameWithType, uri, tarFile); _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L); throw e; + } finally { + if (_segmentDownloadSemaphore != null) { + _segmentDownloadSemaphore.release(); + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java index 42994edcfd..e673c9618d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java @@ -38,6 +38,7 @@ import org.apache.pinot.spi.utils.Pair; */ public class TableDataManagerProvider { private static Semaphore _segmentBuildSemaphore; + private static int _maxParallelSegmentDownloads; private TableDataManagerProvider() { } @@ -47,6 +48,7 @@ public class TableDataManagerProvider { if (maxParallelBuilds > 0) { _segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true); } + _maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads(); } public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId, @@ -67,7 +69,8 @@ public class TableDataManagerProvider { default: throw new IllegalStateException(); } - tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, errorCache); + tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, errorCache, + _maxParallelSegmentDownloads); return tableDataManager; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java index 86299bfb92..7adc732b61 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java @@ -119,7 +119,7 @@ public class BaseTableDataManagerAcquireSegmentTest { when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath()); } tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class), - new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null); + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, 0); tableDataManager.start(); Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap"); segsMapField.setAccessible(true); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index fac8977c5f..97362fc48c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -529,7 +529,7 @@ public class BaseTableDataManagerTest { OfflineTableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class), - new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null); + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, 0); tableDataManager.start(); return tableDataManager; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index 7eb74e5fc7..922ce7b49d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -124,7 +124,7 @@ public class DimensionTableDataManagerTest { when(config.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath()); } tableDataManager.init(config, "dummyInstance", mockPropertyStore(), - new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null); + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, 0); tableDataManager.start(); return tableDataManager; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 3ae95a6452..e6efe563c8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -49,7 +49,7 @@ public interface TableDataManager { */ void init(TableDataManagerConfig tableDataManagerConfig, String instanceId, ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager, - LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache); + LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, int maxParallelSegmentDownloads); /** * Starts the table data manager. Should be called only once after table data manager gets initialized but before diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index 809a86642d..12f497301e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -74,6 +74,13 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig private static final String MAX_PARALLEL_SEGMENT_BUILDS = "realtime.max.parallel.segment.builds"; private static final int DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS = 4; + // Key of how many parallel segment downloads can be made per table. + // A value of <= 0 indicates unlimited. + // Unlimited parallel downloads can make Pinot controllers receive high burst of download requests, + // causing controllers unavailable for that period of time. + private static final String MAX_PARALLEL_SEGMENT_DOWNLOADS = "table.level.max.parallel.segment.downloads"; + private static final int DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS = 4; + // Key of whether to enable split commit private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit"; // Key of whether to enable split commit end with segment metadata files. @@ -211,6 +218,12 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig .getProperty(MAX_PARALLEL_SEGMENT_BUILDS, DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS); } + @Override + public int getMaxParallelSegmentDownloads() { + return _instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_SEGMENT_DOWNLOADS, + DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS); + } + @Override public String getAuthToken() { return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java index 2708373a08..9b38dfd63b 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java @@ -193,7 +193,7 @@ public abstract class BaseResourceTest { TableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager .init(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class), - mock(HelixManager.class), null); + mock(HelixManager.class), null, 0); tableDataManager.start(); _tableDataManagerMap.put(tableNameWithType, tableDataManager); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java index 7718930657..baa29861ca 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java @@ -53,6 +53,8 @@ public interface InstanceDataManagerConfig { int getMaxParallelSegmentBuilds(); + int getMaxParallelSegmentDownloads(); + String getAuthToken(); String getSegmentDirectoryLoader(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
