npawar commented on a change in pull request #6740:
URL: https://github.com/apache/incubator-pinot/pull/6740#discussion_r610161478
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -185,38 +177,107 @@ public static String buildSegment(SegmentGeneratorConfig
segmentGeneratorConfig)
}
/**
- * Uploads the segment tar files to the provided controller
+ * Uploads the segments from the provided segmentTar URIs to the table,
using push details from the batchConfig
+ * @param tableNameWithType name of the table to upload the segment
+ * @param batchConfig batchConfig with details about push such as
controllerURI, pushAttempts, pushParallelism, etc
+ * @param segmentTarURIs list of URI for the segment tar files
+ * @param authContext auth details required to upload the Pinot segment to
controller
*/
- public static void uploadSegment(String tableNameWithType, List<File>
tarFiles, URI controllerUri,
- final String authToken)
- throws RetriableOperationException, AttemptsExceededException {
- for (File tarFile : tarFiles) {
- String fileName = tarFile.getName();
-
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
- String segmentName = fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length());
-
- RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS,
DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
- try (InputStream inputStream = new FileInputStream(tarFile)) {
- SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri),
segmentName, inputStream,
- FileUploadDownloadClient.makeAuthHeader(authToken),
- FileUploadDownloadClient.makeTableParam(tableNameWithType),
- FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
- LOGGER.info("Response for pushing table {} segment {} - {}: {}",
tableNameWithType, segmentName,
- response.getStatusCode(), response.getResponse());
- return true;
- } catch (HttpErrorStatusException e) {
- int statusCode = e.getStatusCode();
- if (statusCode >= 500) {
- LOGGER.warn("Caught temporary exception while pushing table: {}
segment: {}, will retry", tableNameWithType,
- segmentName, e);
- return false;
- } else {
- throw e;
+ public static void uploadSegment(String tableNameWithType, BatchConfig
batchConfig, List<URI> segmentTarURIs,
+ @Nullable AuthContext authContext)
+ throws Exception {
+
+ SegmentGenerationJobSpec segmentUploadSpec =
generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext);
+
+ String pushMode = batchConfig.getPushMode();
+ switch
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+ case TAR:
+ try {
+ SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS,
+
segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList()));
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ case URI:
+ try {
+ URI outputSegmentDirURI = null;
+ if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) {
+ outputSegmentDirURI =
URI.create(batchConfig.getOutputSegmentDirURI());
}
+ List<String> segmentUris = new ArrayList<>();
+ for (URI segmentTarURI : segmentTarURIs) {
+ URI updatedURI =
SegmentPushUtils.generateSegmentTarURI(outputSegmentDirURI, segmentTarURI,
+ segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(),
+ segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix());
+ segmentUris.add(updatedURI.toString());
+ }
+ SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ case METADATA:
+ try {
+ URI outputSegmentDirURI = null;
+ if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) {
+ outputSegmentDirURI =
URI.create(batchConfig.getOutputSegmentDirURI());
+ }
+ PinotFS outputFileFS = getOutputPinotFS(batchConfig,
outputSegmentDirURI);
+ Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+ .getSegmentUriToTarPathMap(outputSegmentDirURI,
segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(),
+ segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix(),
new String[]{segmentTarURIs.toString()});
+ SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec,
outputFileFS, segmentUriToTarPathMap);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
}
- });
+ break;
+ default:
+ throw new UnsupportedOperationException("Unrecognized push mode - " +
pushMode);
+ }
+ }
+
+ private static SegmentGenerationJobSpec generateSegmentUploadSpec(String
tableName, BatchConfig batchConfig,
+ @Nullable AuthContext authContext) {
+
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName(tableName);
+
+ PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+ pinotClusterSpec.setControllerURI(batchConfig.getPushControllerURI());
+ PinotClusterSpec[] pinotClusterSpecs = new
PinotClusterSpec[]{pinotClusterSpec};
+
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ pushJobSpec.setPushAttempts(batchConfig.getPushAttempts());
+ pushJobSpec.setPushParallelism(batchConfig.getPushParallelism());
+
pushJobSpec.setPushRetryIntervalMillis(batchConfig.getPushIntervalRetryMillis());
+ pushJobSpec.setSegmentUriPrefix(batchConfig.getPushSegmentURIPrefix());
+ pushJobSpec.setSegmentUriSuffix(batchConfig.getPushSegmentURISuffix());
+
+ SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+ spec.setPushJobSpec(pushJobSpec);
+ spec.setTableSpec(tableSpec);
+ spec.setPinotClusterSpecs(pinotClusterSpecs);
+ if (authContext != null &&
StringUtils.isNotBlank(authContext.getAuthToken())) {
+ spec.setAuthToken(authContext.getAuthToken());
+ }
+ return spec;
+ }
+
+ /**
+ * Creates an instance of the PinotFS using the fileURI and fs properties
from BatchConfig
+ */
+ public static PinotFS getOutputPinotFS(BatchConfig batchConfig, URI fileURI)
{
+ String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
+ if (fileURIScheme == null) {
+ fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+ }
+ if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) {
Review comment:
separated it into method registerPinotFS
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]