This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f65b401dc7 Set max number of parallel segment downloads per table in
pinot-server (#8694)
f65b401dc7 is described below
commit f65b401dc7f29c66687b14ba24b04bca45cfa91a
Author: Jialiang Li <[email protected]>
AuthorDate: Fri May 13 10:20:39 2022 -0700
Set max number of parallel segment downloads per table in pinot-server
(#8694)
Co-authored-by: Jack Li(Analytics Engineering) <[email protected]>
---
.../core/data/manager/BaseTableDataManager.java | 25 +++++++++++++++++++++-
.../manager/offline/TableDataManagerProvider.java | 5 ++++-
.../BaseTableDataManagerAcquireSegmentTest.java | 2 +-
.../data/manager/BaseTableDataManagerTest.java | 2 +-
.../offline/DimensionTableDataManagerTest.java | 2 +-
.../local/data/manager/TableDataManager.java | 2 +-
.../helix/HelixInstanceDataManagerConfig.java | 13 +++++++++++
.../apache/pinot/server/api/BaseResourceTest.java | 2 +-
.../config/instance/InstanceDataManagerConfig.java | 2 ++
9 files changed, 48 insertions(+), 7 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 79d1741fb6..9abd4164f9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -72,6 +73,8 @@ public abstract class BaseTableDataManager implements
TableDataManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseTableDataManager.class);
protected final ConcurrentHashMap<String, SegmentDataManager>
_segmentDataManagerMap = new ConcurrentHashMap<>();
+ // Semaphore to restrict the maximum number of parallel segment downloads
for a table.
+ private Semaphore _segmentDownloadSemaphore;
protected TableDataManagerConfig _tableDataManagerConfig;
protected String _instanceId;
@@ -92,7 +95,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
@Override
public void init(TableDataManagerConfig tableDataManagerConfig, String
instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics
serverMetrics, HelixManager helixManager,
- @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo>
errorCache) {
+ @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo>
errorCache, int maxParallelSegmentDownloads) {
LOGGER.info("Initializing table data manager for table: {}",
tableDataManagerConfig.getTableName());
_tableDataManagerConfig = tableDataManagerConfig;
@@ -119,6 +122,14 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_resourceTmpDir);
}
_errorCache = errorCache;
+ if (maxParallelSegmentDownloads > 0) {
+ LOGGER.info(
+ "Construct segment download semaphore for Table: {}. Maximum number
of parallel segment downloads: {}",
+ _tableNameWithType, maxParallelSegmentDownloads);
+ _segmentDownloadSemaphore = new Semaphore(maxParallelSegmentDownloads,
true);
+ } else {
+ _segmentDownloadSemaphore = null;
+ }
_logger = LoggerFactory.getLogger(_tableNameWithType + "-" +
getClass().getSimpleName());
doInit();
@@ -403,6 +414,14 @@ public abstract class BaseTableDataManager implements
TableDataManager {
File tarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
String uri = zkMetadata.getDownloadUrl();
try {
+ if (_segmentDownloadSemaphore != null) {
+ long startTime = System.currentTimeMillis();
+ LOGGER.info("Trying to acquire segment download semaphore for: {}.
queue-length: {} ", segmentName,
+ _segmentDownloadSemaphore.getQueueLength());
+ _segmentDownloadSemaphore.acquire();
+ LOGGER.info("Acquired segment download semaphore for: {}
(lock-time={}ms, queue-length={}).", segmentName,
+ System.currentTimeMillis() - startTime,
_segmentDownloadSemaphore.getQueueLength());
+ }
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile,
zkMetadata.getCrypterName());
LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to:
{}, file length: {}", segmentName,
_tableNameWithType, uri, tarFile, tarFile.length());
@@ -412,6 +431,10 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_tableNameWithType, uri, tarFile);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
throw e;
+ } finally {
+ if (_segmentDownloadSemaphore != null) {
+ _segmentDownloadSemaphore.release();
+ }
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 42994edcfd..e673c9618d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.utils.Pair;
*/
public class TableDataManagerProvider {
private static Semaphore _segmentBuildSemaphore;
+ private static int _maxParallelSegmentDownloads;
private TableDataManagerProvider() {
}
@@ -47,6 +48,7 @@ public class TableDataManagerProvider {
if (maxParallelBuilds > 0) {
_segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
}
+ _maxParallelSegmentDownloads =
instanceDataManagerConfig.getMaxParallelSegmentDownloads();
}
public static TableDataManager getTableDataManager(TableDataManagerConfig
tableDataManagerConfig, String instanceId,
@@ -67,7 +69,8 @@ public class TableDataManagerProvider {
default:
throw new IllegalStateException();
}
- tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore,
serverMetrics, helixManager, errorCache);
+ tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore,
serverMetrics, helixManager, errorCache,
+ _maxParallelSegmentDownloads);
return tableDataManager;
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 86299bfb92..7adc732b61 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -119,7 +119,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
}
tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null);
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, 0);
tableDataManager.start();
Field segsMapField =
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
segsMapField.setAccessible(true);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index fac8977c5f..97362fc48c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -529,7 +529,7 @@ public class BaseTableDataManagerTest {
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null);
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, 0);
tableDataManager.start();
return tableDataManager;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 7eb74e5fc7..922ce7b49d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -124,7 +124,7 @@ public class DimensionTableDataManagerTest {
when(config.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
}
tableDataManager.init(config, "dummyInstance", mockPropertyStore(),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
helixManager, null);
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
helixManager, null, 0);
tableDataManager.start();
return tableDataManager;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 3ae95a6452..e6efe563c8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -49,7 +49,7 @@ public interface TableDataManager {
*/
void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics
serverMetrics, HelixManager helixManager,
- LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache);
+ LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, int
maxParallelSegmentDownloads);
/**
* Starts the table data manager. Should be called only once after table
data manager gets initialized but before
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 809a86642d..b9ea877404 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -74,6 +74,13 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
private static final String MAX_PARALLEL_SEGMENT_BUILDS =
"realtime.max.parallel.segment.builds";
private static final int DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS = 4;
+ // Key of how many parallel segment downloads can be made per table.
+ // A value of <= 0 indicates unlimited.
+ // Unlimited parallel downloads can make Pinot controllers receive high
burst of download requests,
+ // causing controllers unavailable for that period of time.
+ private static final String MAX_PARALLEL_SEGMENT_DOWNLOADS =
"table.level.max.parallel.segment.downloads";
+ private static final int DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS = -1;
+
// Key of whether to enable split commit
private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit";
// Key of whether to enable split commit end with segment metadata files.
@@ -211,6 +218,12 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
.getProperty(MAX_PARALLEL_SEGMENT_BUILDS,
DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS);
}
+ @Override
+ public int getMaxParallelSegmentDownloads() {
+ return
_instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_SEGMENT_DOWNLOADS,
+ DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
+ }
+
@Override
public String getAuthToken() {
return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN);
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 2708373a08..9b38dfd63b 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -193,7 +193,7 @@ public abstract class BaseResourceTest {
TableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager
.init(tableDataManagerConfig, "testInstance",
mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class),
- mock(HelixManager.class), null);
+ mock(HelixManager.class), null, 0);
tableDataManager.start();
_tableDataManagerMap.put(tableNameWithType, tableDataManager);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index 7718930657..baa29861ca 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -53,6 +53,8 @@ public interface InstanceDataManagerConfig {
int getMaxParallelSegmentBuilds();
+ int getMaxParallelSegmentDownloads();
+
String getAuthToken();
String getSegmentDirectoryLoader();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]