swaminathanmanish commented on code in PR #13597:
URL: https://github.com/apache/pinot/pull/13597#discussion_r1680255251
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2327,6 +2327,82 @@ public void assignTableSegment(String tableNameWithType,
String segmentName) {
}
}
+ // Assign a list of segments in batch mode
+ public void assignTableSegments(String tableNameWithType, List<String>
segmentNames) {
+ Map<String, String> segmentZKMetadataPathMap = new HashMap<>();
+ for (String segmentName: segmentNames) {
+ String segmentZKMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
+ segmentName);
+ segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath);
+ }
+ // Assign instances for the segment and add it into IdealState
+ try {
+ TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: " + tableNameWithType);
+
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+ // Initialize tier information only in case direct tier assignment is
configured
+ if (_enableTieredSegmentAssignment &&
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ List<Tier> sortedTiers =
TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+ for (String segmentName: segmentNames) {
+ // Update segment tier to support direct assignment for multiple
data directories
+ updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+ InstancePartitions tierInstancePartitions =
TierConfigUtils.getTieredInstancePartitionsForSegment(
+ tableNameWithType, segmentName, sortedTiers, _helixZkManager);
+ if (tierInstancePartitions != null &&
TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ // Override instance partitions for offline table
+ LOGGER.info("Overriding with tiered instance partitions: {} for
segment: {} of table: {}",
+ tierInstancePartitions, segmentName, tableNameWithType);
+ instancePartitionsMap =
Collections.singletonMap(InstancePartitionsType.OFFLINE,
tierInstancePartitions);
+ }
+ }
+ }
+
+ SegmentAssignment segmentAssignment =
+ SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
tableConfig, _controllerMetrics);
+ synchronized (getTableUpdaterLock(tableNameWithType)) {
Review Comment:
Can we record the time for this update (start/end)? We can add that log.info
as to how long it takes to add batch of segments (size of the list) and also a
latency metric if it'll help
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -303,62 +306,86 @@ public static void
sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
} else {
segmentMetadataFile = generateSegmentMetadataFile(fileSystem,
URI.create(tarFilePath));
}
- try {
- for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
- URI controllerURI;
- try {
- controllerURI = new URI(pinotClusterSpec.getControllerURI());
- } catch (URISyntaxException e) {
- throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
- }
- LOGGER.info("Pushing segment: {} to location: {} for table {}",
segmentName, controllerURI, tableName);
- int attempts = 1;
- if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
- attempts = spec.getPushJobSpec().getPushAttempts();
- }
- long retryWaitMs = 1000L;
- if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
- retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
- }
- RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
- List<Header> reqHttpHeaders = new ArrayList<>(headers);
- try {
- reqHttpHeaders.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
segmentUriPath));
- reqHttpHeaders.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
-
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
- if (spec.getPushJobSpec() != null) {
- reqHttpHeaders.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
-
String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
- }
+ segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+ segmentUriPathMap.put(segmentName, segmentUriPath);
+ }
- SimpleHttpResponse response =
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(
- FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
segmentName,
- segmentMetadataFile, reqHttpHeaders, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
- LOGGER.info("Response for pushing table {} segment {} to
location {} - {}: {}", tableName, segmentName,
- controllerURI, response.getStatusCode(),
response.getResponse());
- return true;
- } catch (HttpErrorStatusException e) {
- int statusCode = e.getStatusCode();
- if (statusCode >= 500) {
- // Temporary exception
- LOGGER.warn("Caught temporary exception while pushing table:
{} segment: {} to {}, will retry",
- tableName, segmentName, controllerURI, e);
- return false;
- } else {
- // Permanent exception
- LOGGER.error("Caught permanent exception while pushing table:
{} segment: {} to {}, won't retry",
- tableName, segmentName, controllerURI, e);
- throw e;
- }
- }
- });
+ // perform metadata push in batch mode for every cluster
+ try {
+ for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+ URI controllerURI;
+ try {
+ controllerURI = new URI(pinotClusterSpec.getControllerURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
+ }
+ LOGGER.info("Pushing segments: {} to Pinot cluster: {} for table {}",
+ segmentMetadataFileMap.keySet(), controllerURI, tableName);
+ int attempts = 1;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
+ attempts = spec.getPushJobSpec().getPushAttempts();
}
- } finally {
- FileUtils.deleteQuietly(segmentMetadataFile);
+ long retryWaitMs = 1000L;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+ retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+ }
+ RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
+ List<Header> reqHttpHeaders = new ArrayList<>(headers);
+ try {
+ addHeaders(segmentUriPathMap, spec, reqHttpHeaders);
+ URI segmentUploadURI = getSegmentUploadURI(spec.getPushJobSpec(),
controllerURI);
+ SimpleHttpResponse response =
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI,
+ segmentMetadataFileMap, reqHttpHeaders, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ LOGGER.info("Response for pushing table {} segments {} to location
{} - {}: {}", tableName,
+ segmentMetadataFileMap.keySet(), controllerURI,
response.getStatusCode(), response.getResponse());
+ return true;
+ } catch (HttpErrorStatusException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode >= 500) {
+ // Temporary exception
+ LOGGER.warn("Caught temporary exception while pushing table: {}
segments: {} to {}, will retry",
+ tableName, segmentMetadataFileMap.keySet(), controllerURI,
e);
+ return false;
+ } else {
+ // Permanent exception
+ LOGGER.error("Caught permanent exception while pushing table: {}
segments: {} to {}, won't retry",
Review Comment:
This will be all or nothing for batch mode right?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -403,6 +415,159 @@ private SuccessResponse uploadSegment(@Nullable String
tableName, TableType tabl
}
}
+ // Method used to update a list of segments in batch mode with the METADATA
upload type.
+ private SuccessResponse uploadSegments(String tableName, TableType
tableType, FormDataMultiPart multiParts,
+ boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders
headers, Request request) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ String tableNameWithType = tableType == TableType.OFFLINE ?
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+ : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
Review Comment:
Can we add a log somewhere (either here or in the caller) on the batch size
that this API is taking up.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2327,6 +2327,82 @@ public void assignTableSegment(String tableNameWithType,
String segmentName) {
}
}
+ // Assign a list of segments in batch mode
+ public void assignTableSegments(String tableNameWithType, List<String>
segmentNames) {
+ Map<String, String> segmentZKMetadataPathMap = new HashMap<>();
+ for (String segmentName: segmentNames) {
+ String segmentZKMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
+ segmentName);
+ segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath);
+ }
+ // Assign instances for the segment and add it into IdealState
+ try {
+ TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: " + tableNameWithType);
+
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+ // Initialize tier information only in case direct tier assignment is
configured
+ if (_enableTieredSegmentAssignment &&
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ List<Tier> sortedTiers =
TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+ for (String segmentName: segmentNames) {
+ // Update segment tier to support direct assignment for multiple
data directories
+ updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+ InstancePartitions tierInstancePartitions =
TierConfigUtils.getTieredInstancePartitionsForSegment(
+ tableNameWithType, segmentName, sortedTiers, _helixZkManager);
+ if (tierInstancePartitions != null &&
TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ // Override instance partitions for offline table
+ LOGGER.info("Overriding with tiered instance partitions: {} for
segment: {} of table: {}",
+ tierInstancePartitions, segmentName, tableNameWithType);
+ instancePartitionsMap =
Collections.singletonMap(InstancePartitionsType.OFFLINE,
tierInstancePartitions);
+ }
+ }
+ }
+
+ SegmentAssignment segmentAssignment =
+ SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
tableConfig, _controllerMetrics);
+ synchronized (getTableUpdaterLock(tableNameWithType)) {
+ Map<InstancePartitionsType, InstancePartitions>
finalInstancePartitionsMap = instancePartitionsMap;
+ HelixHelper.updateIdealState(_helixZkManager, tableNameWithType,
idealState -> {
+ assert idealState != null;
+ for (String segmentName: segmentNames) {
+ Map<String, Map<String, String>> currentAssignment =
idealState.getRecord().getMapFields();
+ if (currentAssignment.containsKey(segmentName)) {
+ LOGGER.warn("Segment: {} already exists in the IdealState for
table: {}, do not update", segmentName,
+ tableNameWithType);
+ } else {
+ List<String> assignedInstances =
+ segmentAssignment.assignSegment(segmentName,
currentAssignment, finalInstancePartitionsMap);
+ LOGGER.info("Assigning segment: {} to instances: {} for table:
{}", segmentName, assignedInstances,
+ tableNameWithType);
+ currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
+ SegmentStateModel.ONLINE));
+ }
+ }
+ return idealState;
+ });
+ LOGGER.info("Added segments: {} to IdealState for table: {}",
segmentNames, tableNameWithType);
+ }
+ } catch (Exception e) {
+ LOGGER.error(
+ "Caught exception while adding segments: {} to IdealState for table:
{}, deleting segments ZK metadata",
+ segmentNames, tableNameWithType, e);
+ for (Map.Entry<String, String> segmentZKMetadataPathEntry:
segmentZKMetadataPathMap.entrySet()) {
+ String segmentName = segmentZKMetadataPathEntry.getKey();
+ String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue();
+ if (_propertyStore.remove(segmentZKMetadataPath,
AccessOption.PERSISTENT)) {
+ LOGGER.info("Deleted segment ZK metadata for segment: {} of table:
{}", segmentName, tableNameWithType);
+ } else {
+ LOGGER.error("Failed to deleted segment ZK metadata for segment: {}
of table: {}", segmentName,
Review Comment:
There's no retry for deletion update to Zk? I guess we did not have this in
original logic as well
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -295,13 +302,18 @@ private SuccessResponse uploadSegment(@Nullable String
tableName, TableType tabl
extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
copySegmentToFinalLocation =
Boolean.parseBoolean(copySegmentToDeepStore);
createSegmentFileFromMultipart(multiPart, destFile);
+ PinotFS pinotFS = null;
try {
URI segmentURI = new URI(sourceDownloadURIStr);
- PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
+ pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
segmentSizeInBytes = -1;
LOGGER.warn("Could not fetch segment size for metadata push", e);
+ } finally {
+ if (pinotFS != null) {
Review Comment:
Good catch !
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -277,6 +278,8 @@ public static void
sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
Map<String, String> segmentUriToTarPathMap, List<Header> headers,
List<NameValuePair> parameters)
throws Exception {
String tableName = spec.getTableSpec().getTableName();
+ Map<String, File> segmentMetadataFileMap = new HashMap<>();
+ Map<String, String> segmentUriPathMap = new HashMap<>();
Review Comment:
Should we have a limit on the batch size or have users provide this via
config ? The next thing we will face is that we are pushing a large batch :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]