This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch SegmentProcessorFrameworkImprovement
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to
refs/heads/SegmentProcessorFrameworkImprovement by this push:
new 202216b5b7 Added support to upload segments in batch mode with
METADATA upload type (#13690)
202216b5b7 is described below
commit 202216b5b7b813c03544930defaa352ae9c819a6
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Thu Jul 25 11:16:08 2024 -0700
Added support to upload segments in batch mode with METADATA upload type
(#13690)
---
.../common/utils/FileUploadDownloadClient.java | 7 +
.../PinotSegmentUploadDownloadRestletResource.java | 355 ++++++++++++++++++++-
.../api/upload/SegmentUploadMetadata.java | 117 +++++++
.../pinot/controller/api/upload/ZKOperator.java | 321 +++++++++++++++++++
.../helix/core/PinotHelixResourceManager.java | 78 +++++
.../SegmentUploadIntegrationTest.java | 104 +++++-
.../BaseMultipleSegmentsConversionExecutor.java | 212 +++++++-----
.../segment/local/utils/SegmentPushUtils.java | 130 ++++++++
.../spi/ingestion/batch/spec/PushJobSpec.java | 15 +
9 files changed, 1255 insertions(+), 84 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 4123e3157e..4a2cb33be2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -113,6 +113,7 @@ public class FileUploadDownloadClient implements
AutoCloseable {
private static final String SCHEMA_PATH = "/schemas";
private static final String OLD_SEGMENT_PATH = "/segments";
private static final String SEGMENT_PATH = "/v2/segments";
+ private static final String SEGMENT_UPLOAD_BATCH_PATH = "/v3/segments";
private static final String TABLES_PATH = "/tables";
private static final String TYPE_DELIMITER = "type=";
private static final String START_REPLACE_SEGMENTS_PATH =
"/startReplaceSegments";
@@ -365,6 +366,12 @@ public class FileUploadDownloadClient implements
AutoCloseable {
return getURI(controllerURI.getScheme(), controllerURI.getHost(),
controllerURI.getPort(), SEGMENT_PATH);
}
+ public static URI getUploadSegmentBatchURI(URI controllerURI)
+ throws URISyntaxException {
+ return getURI(controllerURI.getScheme(), controllerURI.getHost(),
controllerURI.getPort(),
+ SEGMENT_UPLOAD_BATCH_PATH);
+ }
+
public static URI getStartReplaceSegmentsURI(URI controllerURI, String
rawTableName, String tableType,
boolean forceCleanup)
throws URISyntaxException {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 156a3e9095..2f5082e5b2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -29,14 +29,20 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -59,6 +65,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
+import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@@ -73,6 +80,7 @@ import
org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.controller.ControllerConf;
@@ -81,6 +89,7 @@ import
org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.upload.SegmentUploadMetadata;
import org.apache.pinot.controller.api.upload.SegmentValidationUtils;
import org.apache.pinot.controller.api.upload.ZKOperator;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -101,6 +110,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.glassfish.grizzly.http.server.Request;
+import org.glassfish.jersey.media.multipart.BodyPart;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.server.ManagedAsync;
@@ -295,13 +305,18 @@ public class PinotSegmentUploadDownloadRestletResource {
extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
copySegmentToFinalLocation =
Boolean.parseBoolean(copySegmentToDeepStore);
createSegmentFileFromMultipart(multiPart, destFile);
+ PinotFS pinotFS = null;
try {
URI segmentURI = new URI(sourceDownloadURIStr);
- PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
+ pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
segmentSizeInBytes = -1;
LOGGER.warn("Could not fetch segment size for metadata push", e);
+ } finally {
+ if (pinotFS != null) {
+ pinotFS.close();
+ }
}
break;
default:
@@ -403,6 +418,234 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
+ // Method used to update a list of segments in batch mode with the METADATA
upload type.
+ private SuccessResponse uploadSegments(String tableName, TableType
tableType, FormDataMultiPart multiParts,
+ boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders
headers, Request request) {
+ long segmentsUploadStartTimeMs = System.currentTimeMillis();
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ String tableNameWithType = tableType == TableType.OFFLINE ?
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+ : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER, "Failed to fetch table
config for table: " + tableNameWithType,
+ Response.Status.BAD_REQUEST);
+ }
+
+ String clientAddress;
+ try {
+ clientAddress =
InetAddress.getByName(request.getRemoteAddr()).getHostName();
+ } catch (UnknownHostException e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to resolve
hostname from input request",
+ Response.Status.BAD_REQUEST, e);
+ }
+
+ String uploadTypeStr = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
+ FileUploadType uploadType = getUploadType(uploadTypeStr);
+ if (!FileUploadType.METADATA.equals(uploadType)) {
+ throw new ControllerApplicationException(LOGGER, "Unsupported upload
type: " + uploadTypeStr,
+ Response.Status.BAD_REQUEST);
+ }
+
+ String crypterClassNameInHeader = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+ String ingestionDescriptor = extractHttpHeader(headers,
CommonConstants.Controller.INGESTION_DESCRIPTOR);
+ ControllerFilePathProvider provider =
ControllerFilePathProvider.getInstance();
+ List<SegmentUploadMetadata> segmentUploadMetadataList = new ArrayList<>();
+ List<File> tempFiles = new ArrayList<>();
+ List<String> segmentNames = new ArrayList<>();
+ List<BodyPart> bodyParts = multiParts.getBodyParts();
+ LOGGER.info("Uploading segments in batch mode of size: {}",
bodyParts.size());
+
+ // there would be just one body part
+ FormDataBodyPart bodyPartFromReq = (FormDataBodyPart) bodyParts.get(0);
+ String uuid = UUID.randomUUID().toString();
+ File allSegmentsMetadataTarFile = new File(FileUtils.getTempDirectory(),
"allSegmentsMetadataTar-" + uuid
+ + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ try {
+ createSegmentFileFromBodyPart(bodyPartFromReq,
allSegmentsMetadataTarFile);
+ } catch (IOException e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to extract
segment metadata files from the input "
+ + "request. ", Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+
+ List<File> segmentMetadataFiles = new ArrayList<>();
+ File allSegmentsMetadataDir = new File(FileUtils.getTempDirectory(),
"allSegmentsMetadataDir-" + uuid);
+ try {
+ FileUtils.forceMkdir(allSegmentsMetadataDir);
+ List<File> metadataFiles =
TarGzCompressionUtils.untar(allSegmentsMetadataTarFile, allSegmentsMetadataDir);
+ if (!metadataFiles.isEmpty()) {
+ segmentMetadataFiles.addAll(metadataFiles);
+ }
+ } catch (IOException e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to unzip the
segment metadata files. ",
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+
+ // segmentName, (creation.meta, metadata.properties)
+ Map<String, SegmentMetadataInfo> segmentMetadataFileMap = new HashMap<>();
+ // segmentName, segmentDownloadURI
+ Map<String, String> segmentURIMap = new HashMap<>();
+ for (File file: segmentMetadataFiles) {
+ String fileName = file.getName();
+ if (fileName.equalsIgnoreCase("all_segments_metadata")) {
+ try (InputStream inputStream = FileUtils.openInputStream(file)) {
+ final InputStreamReader reader = new InputStreamReader(inputStream,
Charsets.toCharset(
+ StandardCharsets.UTF_8));
+ try (BufferedReader bufReader = IOUtils.toBufferedReader(reader)) {
+ String segmentNameLine;
+ String segmentURILine;
+ while ((segmentNameLine = bufReader.readLine()) != null) {
+ segmentURILine = bufReader.readLine();
+ segmentURIMap.put(segmentNameLine, segmentURILine);
+ }
+ }
+ } catch (IOException e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to read the
all_segment_metadata file. ",
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ } else if (fileName.endsWith(".creation.meta")) {
+ int suffixLength = ".creation.meta".length();
+ String segmentName = fileName.substring(0, fileName.length() -
suffixLength);
+ SegmentMetadataInfo segmentMetadataInfo =
segmentMetadataFileMap.getOrDefault(segmentName,
+ new SegmentMetadataInfo());
+ segmentMetadataInfo.setSegmentCreationMetaFile(file);
+ segmentMetadataFileMap.put(segmentName, segmentMetadataInfo);
+ } else if (fileName.endsWith(".metadata.properties")) {
+ int suffixLength = ".metadata.properties".length();
+ String segmentName = fileName.substring(0, fileName.length() -
suffixLength);
+ SegmentMetadataInfo segmentMetadataInfo =
segmentMetadataFileMap.getOrDefault(segmentName,
+ new SegmentMetadataInfo());
+ segmentMetadataInfo.setSegmentPropertiesFile(file);
+ segmentMetadataFileMap.put(segmentName, segmentMetadataInfo);
+ }
+ }
+
+ try {
+ int entryCount = 0;
+ for (Map.Entry<String, SegmentMetadataInfo> entry:
segmentMetadataFileMap.entrySet()) {
+ String segmentName = entry.getKey();
+ SegmentMetadataInfo segmentMetadataInfo = entry.getValue();
+ segmentNames.add(segmentName);
+ File tempEncryptedFile;
+ File tempDecryptedFile;
+ File tempSegmentDir;
+ String sourceDownloadURIStr = segmentURIMap.get(segmentName);
+ if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+ throw new ControllerApplicationException(LOGGER,
+ "'DOWNLOAD_URI' is required as a field within the multipart
object for METADATA batch upload mode.",
+ Response.Status.BAD_REQUEST);
+ }
+ // The downloadUri for putting into segment zk metadata
+ String segmentDownloadURIStr = sourceDownloadURIStr;
+
+ String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
+ tempEncryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName + ENCRYPTED_SUFFIX);
+ tempFiles.add(tempEncryptedFile);
+ tempDecryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName);
+ tempFiles.add(tempDecryptedFile);
+ tempSegmentDir = new File(provider.getUntarredFileTempDir(),
tempFileName);
+ tempFiles.add(tempSegmentDir);
+ boolean encryptSegment =
StringUtils.isNotEmpty(crypterClassNameInHeader);
+ File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile;
+ // override copySegmentToFinalLocation if override provided in
headers:COPY_SEGMENT_TO_DEEP_STORE
+ // else set to false for backward compatibility
+ String copySegmentToDeepStore =
+ extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
+ boolean copySegmentToFinalLocation =
Boolean.parseBoolean(copySegmentToDeepStore);
+ createSegmentFileFromSegmentMetadataInfo(segmentMetadataInfo,
destFile);
+ // TODO: Include the untarred segment size when using the METADATA
push rest API. Currently we can only use the
+ // tarred segment size as an approximation.
+ long segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr);
+
+ if (encryptSegment) {
+ decryptFile(crypterClassNameInHeader, tempEncryptedFile,
tempDecryptedFile);
+ }
+
+ String metadataProviderClass =
DefaultMetadataExtractor.class.getName();
+ SegmentMetadata segmentMetadata =
getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass);
+ LOGGER.info("Processing upload request for segment: {} of table: {}
with upload type: {} from client: {}, "
+ + "ingestion descriptor: {}", segmentName, tableNameWithType,
uploadType, clientAddress,
+ ingestionDescriptor);
+
+ // Validate segment
+ if (tableConfig.getIngestionConfig() == null ||
tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
+ SegmentValidationUtils.validateTimeInterval(segmentMetadata,
tableConfig);
+ }
+
+ // Encrypt segment
+ String crypterNameInTableConfig =
tableConfig.getValidationConfig().getCrypterClassName();
+ Pair<String, File> encryptionInfo =
+ encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile,
encryptSegment, crypterClassNameInHeader,
+ crypterNameInTableConfig, segmentName, tableNameWithType);
+ File segmentFile = encryptionInfo.getRight();
+
+ // Update download URI if controller is responsible for moving the
segment to the deep store
+ URI finalSegmentLocationURI = null;
+ if (copySegmentToFinalLocation) {
+ URI dataDirURI = provider.getDataDirURI();
+ String dataDirPath = dataDirURI.toString();
+ String encodedSegmentName = URIUtils.encode(segmentName);
+ String finalSegmentLocationPath = URIUtils.getPath(dataDirPath,
rawTableName, encodedSegmentName);
+ if
(dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME))
{
+ segmentDownloadURIStr = URIUtils.getPath(provider.getVip(),
"segments", rawTableName, encodedSegmentName);
+ } else {
+ segmentDownloadURIStr = finalSegmentLocationPath;
+ }
+ finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
+ }
+ SegmentUploadMetadata segmentUploadMetadata =
+ new SegmentUploadMetadata(segmentDownloadURIStr,
sourceDownloadURIStr, finalSegmentLocationURI,
+ segmentSizeInBytes, segmentMetadata, encryptionInfo);
+ segmentUploadMetadataList.add(segmentUploadMetadata);
+ LOGGER.info("Using segment download URI: {} for segment: {} of table:
{} (move segment: {})",
+ segmentDownloadURIStr, segmentFile, tableNameWithType,
copySegmentToFinalLocation);
+ // complete segment operations for all the segments
+ if (++entryCount == segmentMetadataFileMap.size()) {
+ ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager,
_controllerConf, _controllerMetrics);
+ zkOperator.completeSegmentsOperations(tableNameWithType, uploadType,
enableParallelPushProtection,
+ allowRefresh, headers, segmentUploadMetadataList);
+ }
+ }
+ } catch (Exception e) {
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR,
+ segmentUploadMetadataList.size());
+ throw new ControllerApplicationException(LOGGER,
+ "Exception while processing segments to upload: " + e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ } finally {
+ cleanupTempFiles(tempFiles);
+ }
+
+ return new SuccessResponse(String.format("Successfully uploaded segments:
%s of table: %s in %s ms",
+ segmentNames, tableNameWithType, System.currentTimeMillis() -
segmentsUploadStartTimeMs));
+ }
+
+ private static class SegmentMetadataInfo {
+ private File _segmentCreationMetaFile;
+ private File _segmentPropertiesFile;
+
+ public File getSegmentCreationMetaFile() {
+ return _segmentCreationMetaFile;
+ }
+
+ public File getSegmentPropertiesFile() {
+ return _segmentPropertiesFile;
+ }
+
+ public void setSegmentCreationMetaFile(File file) {
+ _segmentCreationMetaFile = file;
+ }
+
+ public void setSegmentPropertiesFile(File file) {
+ _segmentPropertiesFile = file;
+ }
+ }
+
+ private void cleanupTempFiles(List<File> tempFiles) {
+ for (File tempFile : tempFiles) {
+ FileUtils.deleteQuietly(tempFile);
+ }
+ }
+
@Nullable
private String extractHttpHeader(HttpHeaders headers, String name) {
String value = headers.getHeaderString(name);
@@ -555,6 +798,65 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
+ @POST
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ @Path("/v3/segments")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Cluster.UPLOAD_SEGMENT)
+ @Authenticate(AccessType.CREATE)
+ @ApiOperation(value = "Upload a batch of segments", notes = "Upload a batch
of segments with METADATA upload type")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully uploaded segment"),
+ @ApiResponse(code = 400, message = "Bad Request"),
+ @ApiResponse(code = 403, message = "Segment validation fails"),
+ @ApiResponse(code = 409, message = "Segment already exists or another
parallel push in progress"),
+ @ApiResponse(code = 410, message = "Segment to refresh does not exist"),
+ @ApiResponse(code = 412, message = "CRC check fails"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
+ @TrackInflightRequestMetrics
+ @TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
+ // This multipart based endpoint is used to upload a list of segments in
batch mode.
+ public void uploadSegmentsAsMultiPart(FormDataMultiPart multiPart,
+ @ApiParam(value = "Name of the table", required = true)
+ @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME)
+ String tableName,
+ @ApiParam(value = "Type of the table", required = true)
+ @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE)
+ String tableType,
+ @ApiParam(value = "Whether to enable parallel push protection")
+ @DefaultValue("false")
+
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
+ boolean enableParallelPushProtection,
+ @ApiParam(value = "Whether to refresh if the segment already exists")
+ @DefaultValue("true")
+ @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
+ boolean allowRefresh,
+ @Context HttpHeaders headers,
+ @Context Request request,
+ @Suspended final AsyncResponse asyncResponse) {
+ if (StringUtils.isEmpty(tableName)) {
+ throw new ControllerApplicationException(LOGGER,
+ "tableName is a required field while uploading segments in batch
mode.", Response.Status.BAD_REQUEST);
+ }
+ if (StringUtils.isEmpty(tableType)) {
+ throw new ControllerApplicationException(LOGGER,
+ "tableType is a required field while uploading segments in batch
mode.", Response.Status.BAD_REQUEST);
+ }
+ if (multiPart == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "multiPart is a required field while uploading segments in batch
mode.", Response.Status.BAD_REQUEST);
+ }
+ try {
+ asyncResponse.resume(
+ uploadSegments(tableName,
TableType.valueOf(tableType.toUpperCase()), multiPart,
enableParallelPushProtection,
+ allowRefresh, headers, request));
+ } catch (Throwable t) {
+ asyncResponse.resume(t);
+ }
+ }
+
@POST
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@@ -752,6 +1054,38 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
+ @VisibleForTesting
+ static void createSegmentFileFromBodyPart(FormDataBodyPart
segmentMetadataBodyPart, File destFile)
+ throws IOException {
+ try (InputStream inputStream =
segmentMetadataBodyPart.getValueAs(InputStream.class);
+ OutputStream outputStream = new FileOutputStream(destFile)) {
+ IOUtils.copyLarge(inputStream, outputStream);
+ } finally {
+ segmentMetadataBodyPart.cleanup();
+ }
+ }
+
+ static void createSegmentFileFromSegmentMetadataInfo(SegmentMetadataInfo
metadataInfo, File destFile)
+ throws IOException {
+ File creationMetaFile = metadataInfo.getSegmentCreationMetaFile();
+ File metadataPropertiesFile = metadataInfo.getSegmentPropertiesFile();
+ String uuid = UUID.randomUUID().toString();
+ File segmentMetadataDir = new File(FileUtils.getTempDirectory(),
"segmentMetadataDir-" + uuid);
+ FileUtils.copyFile(creationMetaFile, new File(segmentMetadataDir,
"creation.meta"));
+ FileUtils.copyFile(metadataPropertiesFile, new File(segmentMetadataDir,
"metadata.properties"));
+ File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(),
"segmentMetadataTar-" + uuid
+ + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ if (segmentMetadataTarFile.exists()) {
+ FileUtils.forceDelete(segmentMetadataTarFile);
+ }
+ TarGzCompressionUtils.createTarGzFile(segmentMetadataDir,
segmentMetadataTarFile);
+ try {
+ FileUtils.copyFile(segmentMetadataTarFile, destFile);
+ } finally {
+ FileUtils.forceDelete(segmentMetadataTarFile);
+ }
+ }
+
private FileUploadType getUploadType(String uploadTypeStr) {
if (uploadTypeStr != null) {
return FileUploadType.valueOf(uploadTypeStr);
@@ -760,6 +1094,25 @@ public class PinotSegmentUploadDownloadRestletResource {
}
}
+ @VisibleForTesting
+ long getSegmentSizeFromFile(String sourceDownloadURIStr)
+ throws IOException {
+ long segmentSizeInBytes = -1;
+ PinotFS pinotFS = null;
+ try {
+ URI segmentURI = new URI(sourceDownloadURIStr);
+ pinotFS = PinotFSFactory.create(segmentURI.getScheme());
+ segmentSizeInBytes = pinotFS.length(segmentURI);
+ } catch (Exception e) {
+ LOGGER.warn(String.format("Exception while segment size for uri: %s",
sourceDownloadURIStr), e);
+ } finally {
+ if (pinotFS != null) {
+ pinotFS.close();
+ }
+ }
+ return segmentSizeInBytes;
+ }
+
// Validate that there is one file that is in the input.
public static boolean validateMultiPart(Map<String, List<FormDataBodyPart>>
map, String segmentName) {
boolean isGood = true;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java
new file mode 100644
index 0000000000..d46891572b
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.upload;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Objects;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+
+
+/**
+ * Data object used while adding or updating segments. It's comprised of the
following fields:
+ * <ol>
+ * <li>segmentDownloadURIStr – The segment download URI persisted into the
ZK metadata.</li>
+ * <li>sourceDownloadURIStr – The URI from where the segment could be
downloaded.</li>
+ * <li>finalSegmentLocationURI – The final location of the segment in the
deep-store.</li>
+ * <li>segmentSizeInBytes – The segment size in bytes.</li>
+ * <li>segmentMetadata – The segment metadata as defined in {@link
org.apache.pinot.segment.spi.SegmentMetadata}.</li>
+ * <li>encryptionInfo – A pair consisting of the crypter class used to
encrypt the segment, and the encrypted segment
+ * file.</li>
+ * <li>segmentMetadataZNRecord – The segment metadata represented as a helix
+ * {@link org.apache.helix.zookeeper.datamodel.ZNRecord}.</li>
+ * </ol>
+ */
+public class SegmentUploadMetadata {
+ private final String _segmentDownloadURIStr;
+ private final String _sourceDownloadURIStr;
+ private final URI _finalSegmentLocationURI;
+ private final Long _segmentSizeInBytes;
+ private final SegmentMetadata _segmentMetadata;
+ private final Pair<String, File> _encryptionInfo;
+ private ZNRecord _segmentMetadataZNRecord;
+
+ public SegmentUploadMetadata(String segmentDownloadURIStr, String
sourceDownloadURIStr, URI finalSegmentLocationURI,
+ Long segmentSizeInBytes, SegmentMetadata segmentMetadata, Pair<String,
File> encryptionInfo) {
+ _segmentDownloadURIStr = segmentDownloadURIStr;
+ _sourceDownloadURIStr = sourceDownloadURIStr;
+ _segmentSizeInBytes = segmentSizeInBytes;
+ _segmentMetadata = segmentMetadata;
+ _encryptionInfo = encryptionInfo;
+ _finalSegmentLocationURI = finalSegmentLocationURI;
+ }
+
+ public String getSegmentDownloadURIStr() {
+ return _segmentDownloadURIStr;
+ }
+
+ public String getSourceDownloadURIStr() {
+ return _sourceDownloadURIStr;
+ }
+
+ public URI getFinalSegmentLocationURI() {
+ return _finalSegmentLocationURI;
+ }
+
+ public Long getSegmentSizeInBytes() {
+ return _segmentSizeInBytes;
+ }
+
+ public SegmentMetadata getSegmentMetadata() {
+ return _segmentMetadata;
+ }
+
+ public Pair<String, File> getEncryptionInfo() {
+ return _encryptionInfo;
+ }
+
+ public void setSegmentMetadataZNRecord(ZNRecord segmentMetadataZNRecord) {
+ _segmentMetadataZNRecord = segmentMetadataZNRecord;
+ }
+
+ public ZNRecord getSegmentMetadataZNRecord() {
+ return _segmentMetadataZNRecord;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SegmentUploadMetadata that = (SegmentUploadMetadata) o;
+ return Objects.equals(_segmentDownloadURIStr, that._segmentDownloadURIStr)
+ && Objects.equals(_sourceDownloadURIStr, that._sourceDownloadURIStr)
+ && Objects.equals(_finalSegmentLocationURI,
that._finalSegmentLocationURI)
+ && Objects.equals(_segmentSizeInBytes, that._segmentSizeInBytes)
+ && Objects.equals(_segmentMetadata, that._segmentMetadata)
+ && Objects.equals(_encryptionInfo, that._encryptionInfo)
+ && Objects.equals(_segmentMetadataZNRecord,
that._segmentMetadataZNRecord);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_segmentDownloadURIStr, _sourceDownloadURIStr,
_finalSegmentLocationURI,
+ _segmentSizeInBytes, _segmentMetadata, _encryptionInfo,
_segmentMetadataZNRecord);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index b0aee83d0d..32249320b9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -21,9 +21,14 @@ package org.apache.pinot.controller.api.upload;
import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -110,6 +115,61 @@ public class ZKOperator {
}
}
+ // Complete segment operations for a list of segments in batch mode
+ public void completeSegmentsOperations(String tableNameWithType,
FileUploadType uploadType,
+ boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders
headers,
+ List<SegmentUploadMetadata> segmentUploadMetadataList)
+ throws Exception {
+ boolean refreshOnly =
+
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
+ List<SegmentUploadMetadata> newSegmentsList = new ArrayList<>();
+ List<SegmentUploadMetadata> existingSegmentsList = new ArrayList<>();
+ for (SegmentUploadMetadata segmentUploadMetadata:
segmentUploadMetadataList) {
+ SegmentMetadata segmentMetadata =
segmentUploadMetadata.getSegmentMetadata();
+ String segmentName = segmentMetadata.getName();
+
+ ZNRecord existingSegmentMetadataZNRecord =
+
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
+ if (existingSegmentMetadataZNRecord != null &&
shouldProcessAsNewSegment(tableNameWithType, segmentName,
+ existingSegmentMetadataZNRecord, enableParallelPushProtection)) {
+ LOGGER.warn("Removing segment ZK metadata (recovering from previous
upload failure) for table: {}, segment: {}",
+ tableNameWithType, segmentName);
+
Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType,
segmentName),
+ "Failed to remove segment ZK metadata for table: %s, segment: %s",
tableNameWithType, segmentName);
+ existingSegmentMetadataZNRecord = null;
+ }
+
+ if (existingSegmentMetadataZNRecord == null) {
+ // Add a new segment
+ if (refreshOnly) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Cannot refresh non-existing segment: %s for
table: %s", segmentName, tableNameWithType),
+ Response.Status.GONE);
+ }
+ LOGGER.info("Adding new segment: {} to table: {}", segmentName,
tableNameWithType);
+ newSegmentsList.add(segmentUploadMetadata);
+ } else {
+ // Refresh an existing segment
+ if (!allowRefresh) {
+ // We cannot perform this check up-front in UploadSegment API call.
If a segment doesn't exist during the
+ // check done up-front but ends up getting created before the check
here, we could incorrectly refresh an
+ // existing segment.
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Segment: %s already exists in table: %s. Refresh
not permitted.", segmentName,
+ tableNameWithType), Response.Status.CONFLICT);
+ }
+ LOGGER.info("Segment: {} already exists in table: {}, refreshing it",
segmentName, tableNameWithType);
+
segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord);
+ existingSegmentsList.add(segmentUploadMetadata);
+ }
+ }
+ // process new segments
+ processNewSegments(tableNameWithType, uploadType,
enableParallelPushProtection, headers, newSegmentsList);
+
+ // process existing segments
+ processExistingSegments(tableNameWithType, uploadType,
enableParallelPushProtection, headers, existingSegmentsList);
+ }
+
/**
* Returns {@code true} when the segment should be processed as new segment.
* <p>When segment ZK metadata exists, check if segment exists in the ideal
state. If the previous upload failed after
@@ -276,6 +336,144 @@ public class ZKOperator {
}
}
+ // process a batch of existing segments
+ private void processExistingSegments(String tableNameWithType,
FileUploadType uploadType,
+ boolean enableParallelPushProtection, HttpHeaders headers,
List<SegmentUploadMetadata> segmentUploadMetadataList)
+ throws Exception {
+ for (SegmentUploadMetadata segmentUploadMetadata:
segmentUploadMetadataList) {
+ SegmentMetadata segmentMetadata =
segmentUploadMetadata.getSegmentMetadata();
+ String segmentDownloadURIStr =
segmentUploadMetadata.getSegmentDownloadURIStr();
+ String sourceDownloadURIStr =
segmentUploadMetadata.getSourceDownloadURIStr();
+ URI finalSegmentLocationURI =
segmentUploadMetadata.getFinalSegmentLocationURI();
+ Pair<String, File> encryptionInfo =
segmentUploadMetadata.getEncryptionInfo();
+ String crypterName = encryptionInfo.getLeft();
+ File segmentFile = encryptionInfo.getRight();
+ String segmentName = segmentMetadata.getName();
+ ZNRecord existingSegmentMetadataZNRecord =
segmentUploadMetadata.getSegmentMetadataZNRecord();
+ long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes();
+ int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
+
+ // Check if CRC match when IF-MATCH header is set
+ SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(existingSegmentMetadataZNRecord);
+ long existingCrc = segmentZKMetadata.getCrc();
+ checkCRC(headers, tableNameWithType, segmentName, existingCrc);
+
+ // Check segment upload start time when parallel push protection enabled
+ if (enableParallelPushProtection) {
+ // When segment upload start time is larger than 0, that means another
upload is in progress
+ long segmentUploadStartTime =
segmentZKMetadata.getSegmentUploadStartTime();
+ if (segmentUploadStartTime > 0) {
+ handleParallelPush(tableNameWithType, segmentName,
segmentUploadStartTime);
+ }
+
+ // Lock the segment by setting the upload start time in ZK
+
segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to lock the segment: %s of table: %s,
retry later", segmentName, tableNameWithType),
+ Response.Status.CONFLICT);
+ } else {
+ // The version will increment if the zk metadata update is successful
+ expectedVersion++;
+ }
+ }
+
+ // Reset segment upload start time to unlock the segment later
+ // NOTE: reset this value even if parallel push protection is not
enabled so that segment can recover in case
+ // previous segment upload did not finish properly and the parallel push
protection is turned off
+ segmentZKMetadata.setSegmentUploadStartTime(-1);
+
+ try {
+ // Construct the segment ZK metadata custom map modifier
+ String customMapModifierStr =
+
headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER);
+ SegmentZKMetadataCustomMapModifier customMapModifier =
+ customMapModifierStr != null ? new
SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null;
+
+ // Update ZK metadata and refresh the segment if necessary
+ long newCrc = Long.parseLong(segmentMetadata.getCrc());
+ if (newCrc == existingCrc) {
+ LOGGER.info(
+ "New segment crc '{}' is the same as existing segment crc for
segment '{}'. Updating ZK metadata without "
+ + "refreshing the segment.", newCrc, segmentName);
+ // NOTE: Even though we don't need to refresh the segment, we should
still update the following fields:
+ // - Creation time (not included in the crc)
+ // - Refresh time
+ // - Custom map
+
segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+ segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+ if (customMapModifier != null) {
+
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
+ } else {
+ // If no modifier is provided, use the custom map from the segment
metadata
+ segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
+ }
+ if
(!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) {
+ // For offline ingestion, it is quite common that the download.uri
would change but the crc would be the
+ // same. E.g. a user re-runs the job which process the same data
and segments are stored/pushed from a
+ // different path from the Deepstore. Read more:
https://github.com/apache/pinot/issues/11535
+ LOGGER.info("Updating segment download url from: {} to: {} even
though crc is the same",
+ segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr);
+ segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr);
+ // When download URI changes, we also need to copy the segment to
the final location if existed.
+ // This typically means users changed the push type from METADATA
to SEGMENT or SEGMENT to METADATA.
+ // Note that switching push type from SEGMENT to METADATA may lead
orphan segments in the controller
+ // managed directory. Read more:
https://github.com/apache/pinot/pull/11720
+ if (finalSegmentLocationURI != null) {
+ copySegmentToDeepStore(tableNameWithType, segmentName,
uploadType, segmentFile, sourceDownloadURIStr,
+ finalSegmentLocationURI);
+ }
+ }
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
+ throw new RuntimeException(
+ String.format("Failed to update ZK metadata for segment: %s,
table: %s, expected version: %d",
+ segmentName, tableNameWithType, expectedVersion));
+ }
+ } else {
+ // New segment is different with the existing one, update ZK
metadata and refresh the segment
+ LOGGER.info(
+ "New segment crc {} is different than the existing segment crc
{}. Updating ZK metadata and refreshing "
+ + "segment {}", newCrc, existingCrc, segmentName);
+ if (finalSegmentLocationURI != null) {
+ copySegmentToDeepStore(tableNameWithType, segmentName, uploadType,
segmentFile, sourceDownloadURIStr,
+ finalSegmentLocationURI);
+ }
+
+ // NOTE: Must first set the segment ZK metadata before trying to
refresh because servers and brokers rely on
+ // segment ZK metadata to refresh the segment (server will compare
the segment ZK metadata with the local
+ // metadata to decide whether to download the new segment; broker
will update the segment partition info &
+ // time boundary based on the segment ZK metadata)
+ if (customMapModifier == null) {
+ // If no modifier is provided, use the custom map from the segment
metadata
+ segmentZKMetadata.setCustomMap(null);
+ ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType,
segmentZKMetadata, segmentMetadata,
+ segmentDownloadURIStr, crypterName, segmentSizeInBytes);
+ } else {
+ // If modifier is provided, first set the custom map from the
segment metadata, then apply the modifier
+ ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType,
segmentZKMetadata, segmentMetadata,
+ segmentDownloadURIStr, crypterName, segmentSizeInBytes);
+
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
+ }
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
+ throw new RuntimeException(
+ String.format("Failed to update ZK metadata for segment: %s,
table: %s, expected version: %d",
+ segmentName, tableNameWithType, expectedVersion));
+ }
+ LOGGER.info("Updated segment: {} of table: {} to property store",
segmentName, tableNameWithType);
+
+ // Send a message to servers and brokers hosting the table to
refresh the segment
+
_pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType,
segmentName, true, true);
+ }
+ } catch (Exception e) {
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
+ LOGGER.error("Failed to update ZK metadata for segment: {}, table:
{}, expected version: {}", segmentName,
+ tableNameWithType, expectedVersion);
+ }
+ throw e;
+ }
+ }
+ }
+
private void checkCRC(HttpHeaders headers, String tableNameWithType, String
segmentName, long existingCrc) {
String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH);
if (expectedCrcStr != null) {
@@ -374,6 +572,102 @@ public class ZKOperator {
}
}
+ // process a batch of new segments
+ private void processNewSegments(String tableNameWithType, FileUploadType
uploadType,
+ boolean enableParallelPushProtection, HttpHeaders headers,
List<SegmentUploadMetadata> segmentUploadMetadataList)
+ throws Exception {
+ Map<String, SegmentZKMetadata> segmentZKMetadataMap = new HashMap<>();
+ List<String> segmentNames = new ArrayList<>();
+ long segmentUploadStartTime = System.currentTimeMillis();
+ for (SegmentUploadMetadata segmentUploadMetadata:
segmentUploadMetadataList) {
+ SegmentMetadata segmentMetadata =
segmentUploadMetadata.getSegmentMetadata();
+ String segmentName = segmentMetadata.getName();
+ SegmentZKMetadata newSegmentZKMetadata;
+ URI finalSegmentLocationURI =
segmentUploadMetadata.getFinalSegmentLocationURI();
+ String segmentDownloadURIStr =
segmentUploadMetadata.getSegmentDownloadURIStr();
+ String sourceDownloadURIStr =
segmentUploadMetadata.getSourceDownloadURIStr();
+ String crypterName = segmentUploadMetadata.getEncryptionInfo().getLeft();
+ long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes();
+ File segmentFile = segmentUploadMetadata.getEncryptionInfo().getRight();
+ try {
+ newSegmentZKMetadata =
ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata,
+ segmentDownloadURIStr, crypterName, segmentSizeInBytes);
+ segmentZKMetadataMap.put(segmentName, newSegmentZKMetadata);
+ segmentNames.add(segmentName);
+ } catch (IllegalArgumentException e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Got invalid segment metadata when adding segment:
%s for table: %s, reason: %s", segmentName,
+ tableNameWithType, e.getMessage()),
Response.Status.BAD_REQUEST);
+ }
+
+ // Lock if enableParallelPushProtection is true.
+ if (enableParallelPushProtection) {
+ newSegmentZKMetadata.setSegmentUploadStartTime(segmentUploadStartTime);
+ }
+
+ // Update zk metadata custom map
+ String segmentZKMetadataCustomMapModifierStr = headers != null ?
headers.getHeaderString(
+
FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER)
: null;
+ if (segmentZKMetadataCustomMapModifierStr != null) {
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier
= new SegmentZKMetadataCustomMapModifier(
+ segmentZKMetadataCustomMapModifierStr);
+
newSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(
+ newSegmentZKMetadata.getCustomMap()));
+ }
+ if
(!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType,
newSegmentZKMetadata)) {
+ throw new RuntimeException(String.format("Failed to create ZK metadata
for segment: %s of table: %s",
+ segmentName, tableNameWithType));
+ }
+
+ if (finalSegmentLocationURI != null) {
+ try {
+ copySegmentToDeepStore(tableNameWithType, segmentName, uploadType,
segmentFile, sourceDownloadURIStr,
+ finalSegmentLocationURI);
+ } catch (Exception e) {
+ // Cleanup the Zk entry and the segment from the permanent directory
if it exists.
+ LOGGER.error("Could not move segment {} from table {} to permanent
directory",
+ segmentName, tableNameWithType, e);
+ // Delete all segments that are getting processed as we are in batch
mode
+ deleteSegmentsIfNeeded(tableNameWithType, segmentNames,
segmentUploadStartTime, enableParallelPushProtection);
+ throw e;
+ }
+ }
+ }
+
+ try {
+ _pinotHelixResourceManager.assignTableSegments(tableNameWithType,
segmentNames);
+ } catch (Exception e) {
+ // assignTableSegment removes the zk entry.
+ // Call deleteSegment to remove the segment from permanent location if
needed.
+ LOGGER.error("Caught exception while calling assignTableSegments for
adding segments: {} to table: {}",
+ segmentZKMetadataMap.keySet(), tableNameWithType, e);
+ deleteSegmentsIfNeeded(tableNameWithType, segmentNames,
segmentUploadStartTime, enableParallelPushProtection);
+ throw e;
+ }
+
+ for (Map.Entry<String, SegmentZKMetadata> segmentZKMetadataEntry:
segmentZKMetadataMap.entrySet()) {
+ SegmentZKMetadata newSegmentZKMetadata =
segmentZKMetadataEntry.getValue();
+ String segmentName = segmentZKMetadataEntry.getKey();
+ if (enableParallelPushProtection) {
+ // Release lock. Expected version will be 0 as we hold a lock and no
updates could take place meanwhile.
+ newSegmentZKMetadata.setSegmentUploadStartTime(-1);
+ if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
newSegmentZKMetadata, 0)) {
+ // There is a race condition when it took too much time for the 1st
segment upload to process (due to slow
+ // PinotFS access), which leads to the 2nd attempt of segment
upload, and the 2nd segment upload succeeded.
+ // In this case, when the 1st upload comes back, it shouldn't
blindly delete the segment when it failed to
+ // update the zk metadata. Instead, the 1st attempt should validate
the upload start time one more time.
+ // If the start time doesn't match with the one persisted in zk
metadata, segment deletion should be skipped.
+ String errorMsg = String.format("Failed to update ZK metadata for
segment: %s of table: %s", segmentName,
+ tableNameWithType);
+ LOGGER.error(errorMsg);
+ // Delete all segments that are getting processed as we are in batch
mode
+ deleteSegmentsIfNeeded(tableNameWithType, segmentNames,
segmentUploadStartTime, true);
+ throw new RuntimeException(errorMsg);
+ }
+ }
+ }
+ }
+
/**
* Deletes the segment to be uploaded if either one of the criteria is
qualified:
* 1) the uploadStartTime matches with the one persisted in ZK metadata.
@@ -397,6 +691,33 @@ public class ZKOperator {
}
}
+ /**
+ * Deletes the segments to be uploaded if either one of the criteria is
qualified:
+ * 1) the uploadStartTime matches with the one persisted in ZK metadata.
+ * 2) enableParallelPushProtection is not enabled.
+ */
+ private void deleteSegmentsIfNeeded(String tableNameWithType, List<String>
segmentNames,
+ long currentSegmentUploadStartTime, boolean
enableParallelPushProtection) {
+ List<String> segmentsToDelete = new ArrayList<>();
+ for (String segmentName: segmentNames) {
+ ZNRecord existingSegmentMetadataZNRecord =
+
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
+ if (existingSegmentMetadataZNRecord == null) {
+ continue;
+ }
+ // Check if the upload start time is set by this thread itself, if yes
delete the segment.
+ SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(existingSegmentMetadataZNRecord);
+ long existingSegmentUploadStartTime =
segmentZKMetadata.getSegmentUploadStartTime();
+ LOGGER.info("Parallel push protection is {} for segment: {}.",
+ (enableParallelPushProtection ? "enabled" : "disabled"),
segmentName);
+ if (!enableParallelPushProtection || currentSegmentUploadStartTime ==
existingSegmentUploadStartTime) {
+ segmentsToDelete.add(segmentName);
+ }
+ }
+ _pinotHelixResourceManager.deleteSegments(tableNameWithType,
segmentsToDelete);
+ LOGGER.info("Deleted zk entry and segments {} for table {}.",
segmentsToDelete, tableNameWithType);
+ }
+
private void copySegmentToDeepStore(String tableNameWithType, String
segmentName, FileUploadType uploadType,
File segmentFile, String sourceDownloadURIStr, URI
finalSegmentLocationURI)
throws Exception {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 2b835faaae..8748a021b3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2327,6 +2327,84 @@ public class PinotHelixResourceManager {
}
}
+ // Assign a list of segments in batch mode
+ public void assignTableSegments(String tableNameWithType, List<String>
segmentNames) {
+ Map<String, String> segmentZKMetadataPathMap = new HashMap<>();
+ for (String segmentName: segmentNames) {
+ String segmentZKMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
+ segmentName);
+ segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath);
+ }
+ // Assign instances for the segment and add it into IdealState
+ try {
+ TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: " + tableNameWithType);
+
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ fetchOrComputeInstancePartitions(tableNameWithType, tableConfig);
+
+ // Initialize tier information only in case direct tier assignment is
configured
+ if (_enableTieredSegmentAssignment &&
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ List<Tier> sortedTiers =
TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(),
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager);
+ for (String segmentName: segmentNames) {
+ // Update segment tier to support direct assignment for multiple
data directories
+ updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+ InstancePartitions tierInstancePartitions =
TierConfigUtils.getTieredInstancePartitionsForSegment(
+ tableNameWithType, segmentName, sortedTiers, _helixZkManager);
+ if (tierInstancePartitions != null &&
TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ // Override instance partitions for offline table
+ LOGGER.info("Overriding with tiered instance partitions: {} for
segment: {} of table: {}",
+ tierInstancePartitions, segmentName, tableNameWithType);
+ instancePartitionsMap =
Collections.singletonMap(InstancePartitionsType.OFFLINE,
tierInstancePartitions);
+ }
+ }
+ }
+
+ SegmentAssignment segmentAssignment =
+ SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
tableConfig, _controllerMetrics);
+ synchronized (getTableUpdaterLock(tableNameWithType)) {
+ long segmentAssignmentStartTs = System.currentTimeMillis();
+ Map<InstancePartitionsType, InstancePartitions>
finalInstancePartitionsMap = instancePartitionsMap;
+ HelixHelper.updateIdealState(_helixZkManager, tableNameWithType,
idealState -> {
+ assert idealState != null;
+ for (String segmentName: segmentNames) {
+ Map<String, Map<String, String>> currentAssignment =
idealState.getRecord().getMapFields();
+ if (currentAssignment.containsKey(segmentName)) {
+ LOGGER.warn("Segment: {} already exists in the IdealState for
table: {}, do not update", segmentName,
+ tableNameWithType);
+ } else {
+ List<String> assignedInstances =
+ segmentAssignment.assignSegment(segmentName,
currentAssignment, finalInstancePartitionsMap);
+ LOGGER.info("Assigning segment: {} to instances: {} for table:
{}", segmentName, assignedInstances,
+ tableNameWithType);
+ currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
+ SegmentStateModel.ONLINE));
+ }
+ }
+ return idealState;
+ });
+ LOGGER.info("Added segments: {} to IdealState for table: {} in {} ms",
segmentNames, tableNameWithType,
+ System.currentTimeMillis() - segmentAssignmentStartTs);
+ }
+ } catch (Exception e) {
+ LOGGER.error(
+ "Caught exception while adding segments: {} to IdealState for table:
{}, deleting segments ZK metadata",
+ segmentNames, tableNameWithType, e);
+ for (Map.Entry<String, String> segmentZKMetadataPathEntry:
segmentZKMetadataPathMap.entrySet()) {
+ String segmentName = segmentZKMetadataPathEntry.getKey();
+ String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue();
+ if (_propertyStore.remove(segmentZKMetadataPath,
AccessOption.PERSISTENT)) {
+ LOGGER.info("Deleted segment ZK metadata for segment: {} of table:
{}", segmentName, tableNameWithType);
+ } else {
+ LOGGER.error("Failed to delete segment ZK metadata for segment: {}
of table: {}", segmentName,
+ tableNameWithType);
+ }
+ }
+ throw e;
+ }
+ }
+
private Map<InstancePartitionsType, InstancePartitions>
fetchOrComputeInstancePartitions(String tableNameWithType,
TableConfig tableConfig) {
if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java
similarity index 78%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java
index 35fdae8448..4729cd282f 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.segupload;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
@@ -27,7 +27,10 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner;
import
org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner;
import
org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner;
@@ -59,6 +62,7 @@ import org.testng.annotations.Test;
* todo: add test for URI push
*/
public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest {
+ private static String _tableNameSuffix;
@Override
protected Map<String, String> getStreamConfigs() {
@@ -93,6 +97,7 @@ public class SegmentUploadIntegrationTest extends
BaseClusterIntegrationTest {
@BeforeMethod
public void setUpTest()
throws IOException {
+ _tableNameSuffix = RandomStringUtils.randomAlphabetic(12);
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
}
@@ -136,15 +141,15 @@ public class SegmentUploadIntegrationTest extends
BaseClusterIntegrationTest {
jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
TableSpec tableSpec = new TableSpec();
- tableSpec.setTableName(DEFAULT_TABLE_NAME);
-
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME));
+ tableSpec.setTableName(getTableName() + "_OFFLINE");
+
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
jobSpec.setTableSpec(tableSpec);
PinotClusterSpec clusterSpec = new PinotClusterSpec();
clusterSpec.setControllerURI(getControllerBaseApiUrl());
jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
File dataDir = new File(_controllerConfig.getDataDir());
- File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);
+ File dataDirSegments = new File(dataDir, getTableName());
// Not present in dataDir, only present in sourceDir
Assert.assertFalse(dataDirSegments.exists());
@@ -204,6 +209,78 @@ public class SegmentUploadIntegrationTest extends
BaseClusterIntegrationTest {
testCountStar(numDocs);
}
+ @Test
+ public void testUploadMultipleSegmentsInBatchModeAndQuery()
+ throws Exception {
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ waitForEVToDisappear(offlineTableConfig.getTableName());
+ addTableConfig(offlineTableConfig);
+
+ List<File> avroFiles = getAllAvroFiles();
+ int numSegments = 12;
+
+ // Create the list of segments
+ for (int segNum = 0; segNum < numSegments; segNum++) {
+ ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(segNum %
12), offlineTableConfig, schema,
+ "_seg" + segNum, _segmentDir, _tarDir);
+ }
+
+ SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner();
+ SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
+ // enable batch mode
+ pushJobSpec.setBatchMode(true);
+ jobSpec.setPushJobSpec(pushJobSpec);
+ PinotFSSpec fsSpec = new PinotFSSpec();
+ fsSpec.setScheme("file");
+ fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
+ jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
+ jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName(getTableName() + "_OFFLINE");
+
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
+ jobSpec.setTableSpec(tableSpec);
+ PinotClusterSpec clusterSpec = new PinotClusterSpec();
+ clusterSpec.setControllerURI(getControllerBaseApiUrl());
+ jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
+
+ File dataDir = new File(_controllerConfig.getDataDir());
+ File dataDirSegments = new File(dataDir, getTableName());
+
+ // Not present in dataDir, only present in sourceDir
+ Assert.assertFalse(dataDirSegments.exists());
+ Assert.assertEquals(_tarDir.listFiles().length, numSegments);
+
+ runner.init(jobSpec);
+ runner.run();
+
+ // Segment should be seen in dataDir
+ Assert.assertTrue(dataDirSegments.exists());
+ Assert.assertEquals(dataDirSegments.listFiles().length, numSegments);
+ Assert.assertEquals(_tarDir.listFiles().length, numSegments);
+
+ // test segment loaded
+ JsonNode segmentsList = getSegmentsList();
+ Assert.assertEquals(segmentsList.size(), numSegments);
+ long numDocs = 0;
+ for (JsonNode segmentName: segmentsList) {
+ numDocs += getNumDocs(segmentName.asText());
+ }
+ testCountStar(numDocs);
+
+ // Clear segment and tar dir
+ for (File segment : _segmentDir.listFiles()) {
+ FileUtils.deleteQuietly(segment);
+ }
+ for (File tar : _tarDir.listFiles()) {
+ FileUtils.deleteQuietly(tar);
+ }
+ }
+
/**
* Runs both SegmentMetadataPushJobRunner and SegmentTarPushJobRunner while
enabling consistent data push.
* Checks that segments are properly loaded and segment lineage entry were
also in expected states.
@@ -237,15 +314,15 @@ public class SegmentUploadIntegrationTest extends
BaseClusterIntegrationTest {
jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
TableSpec tableSpec = new TableSpec();
- tableSpec.setTableName(DEFAULT_TABLE_NAME);
-
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME));
+ tableSpec.setTableName(getTableName() + "_OFFLINE");
+
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
jobSpec.setTableSpec(tableSpec);
PinotClusterSpec clusterSpec = new PinotClusterSpec();
clusterSpec.setControllerURI(getControllerBaseApiUrl());
jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
File dataDir = new File(_controllerConfig.getDataDir());
- File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);
+ File dataDirSegments = new File(dataDir, getTableName());
Assert.assertEquals(_tarDir.listFiles().length, 1);
@@ -268,7 +345,7 @@ public class SegmentUploadIntegrationTest extends
BaseClusterIntegrationTest {
// Fetch segment lineage entry after running segment metadata push with
consistent push enabled.
String segmentLineageResponse = ControllerTest.sendGetRequest(
ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl())
- .forListAllSegmentLineages(DEFAULT_TABLE_NAME,
TableType.OFFLINE.toString()));
+ .forListAllSegmentLineages(getTableName(),
TableType.OFFLINE.toString()));
// Segment lineage should be in completed state.
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\""));
// SegmentsFrom should be empty as we started with a blank table.
@@ -317,7 +394,7 @@ public class SegmentUploadIntegrationTest extends
BaseClusterIntegrationTest {
// Fetch segment lineage entry after running segment tar push with
consistent push enabled.
segmentLineageResponse = ControllerTest.sendGetRequest(
ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl())
- .forListAllSegmentLineages(DEFAULT_TABLE_NAME,
TableType.OFFLINE.toString()));
+ .forListAllSegmentLineages(getTableName(),
TableType.OFFLINE.toString()));
// Segment lineage should be in completed state.
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\""));
// SegmentsFrom should contain the previous segment
@@ -337,14 +414,14 @@ public class SegmentUploadIntegrationTest extends
BaseClusterIntegrationTest {
private long getNumDocs(String segmentName)
throws IOException {
return JsonUtils.stringToJsonNode(
-
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME,
segmentName)))
+
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(),
segmentName)))
.get("segment.total.docs").asLong();
}
private JsonNode getSegmentsList()
throws IOException {
return JsonUtils.stringToJsonNode(sendGetRequest(
- _controllerRequestURLBuilder.forSegmentListAPI(DEFAULT_TABLE_NAME,
TableType.OFFLINE.toString())))
+ _controllerRequestURLBuilder.forSegmentListAPI(getTableName(),
TableType.OFFLINE.toString())))
.get(0).get("OFFLINE");
}
@@ -362,6 +439,11 @@ public class SegmentUploadIntegrationTest extends
BaseClusterIntegrationTest {
}, 100L, 300_000, "Failed to load " + countStarResult + " documents",
true);
}
+ @Override
+ public String getTableName() {
+ return DEFAULT_TABLE_NAME + _tableNameSuffix;
+ }
+
@AfterMethod
public void tearDownTest()
throws IOException {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 8b0963578e..d997f8f0cc 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -179,14 +180,14 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
_pinotTaskConfig = pinotTaskConfig;
_eventObserver =
MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
String taskType = pinotTaskConfig.getTaskType();
- Map<String, String> configs = pinotTaskConfig.getConfigs();
- String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
- String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+ Map<String, String> taskConfigs = pinotTaskConfig.getConfigs();
+ String tableNameWithType = taskConfigs.get(MinionConstants.TABLE_NAME_KEY);
+ String inputSegmentNames =
taskConfigs.get(MinionConstants.SEGMENT_NAME_KEY);
String[] segmentNames =
inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
- String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
- String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
+ String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+ String downloadURLString =
taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
String[] downloadURLs =
downloadURLString.split(MinionConstants.URL_SEPARATOR);
- AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
LOGGER.info("Start executing {} on table: {}, input segments: {} with
downloadURLs: {}, uploadURL: {}", taskType,
tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(),
taskType), "tmp-" + UUID.randomUUID());
@@ -274,6 +275,8 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
SegmentUploadContext segmentUploadContext = new
SegmentUploadContext(pinotTaskConfig, segmentConversionResults);
preUploadSegments(segmentUploadContext);
+ Map<String, String> segmentUriToTarPathMap = new HashMap<>();
+ PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs);
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
@@ -282,51 +285,60 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String resultSegmentName = segmentConversionResult.getSegmentName();
_eventObserver.notifyProgress(_pinotTaskConfig,
String.format("Uploading segment: %s (%d out of %d)",
resultSegmentName, (i + 1), numOutputSegments));
-
- // Set segment ZK metadata custom map modifier into HTTP header to
modify the segment ZK metadata
- SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
- getSegmentZKMetadataCustomMapModifier(pinotTaskConfig,
segmentConversionResult);
- Header segmentZKMetadataCustomMapModifierHeader =
- new
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
- segmentZKMetadataCustomMapModifier.toJsonString());
-
- String pushMode =
- configs.getOrDefault(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.name());
+ String pushMode =
taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.TAR.name());
URI outputSegmentTarURI;
if
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
!= BatchConfigProperties.SegmentPushType.TAR) {
- outputSegmentTarURI = moveSegmentToOutputPinotFS(configs,
convertedTarredSegmentFile);
+ outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs,
convertedTarredSegmentFile);
LOGGER.info("Moved generated segment from [{}] to location: [{}]",
convertedTarredSegmentFile,
outputSegmentTarURI);
} else {
outputSegmentTarURI = convertedTarredSegmentFile.toURI();
}
- List<Header> httpHeaders = new ArrayList<>();
- httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
- httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
-
+ // Set segment ZK metadata custom map modifier into HTTP header to
modify the segment ZK metadata
+ List<Header> httpHeaders =
getSegmentPushCommonHeaders(pinotTaskConfig, authProvider,
segmentConversionResults);
// Set parameters for upload request
- NameValuePair enableParallelPushProtectionParameter =
- new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
"true");
- NameValuePair tableNameParameter = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
- TableNameBuilder.extractRawTableName(tableNameWithType));
- NameValuePair tableTypeParameter = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
-
TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString());
+ List<NameValuePair> parameters =
getSegmentPushCommonParams(tableNameWithType);
+
// RealtimeToOfflineSegmentsTask pushed segments to the corresponding
offline table
// TODO: This is not clean to put the override here, but let's think
about it harder to see what is the proper
// way to override it.
if
(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE.equals(taskType)) {
- tableTypeParameter = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
- TableType.OFFLINE.toString());
+ Iterator<NameValuePair> paramItr = parameters.iterator();
+ while (paramItr.hasNext()) {
+ NameValuePair nameValuePair = paramItr.next();
+ if
(FileUploadDownloadClient.QueryParameters.TABLE_TYPE.equals(nameValuePair.getName()))
{
+ paramItr.remove();
+ parameters.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+ TableType.OFFLINE.toString()));
+ break;
+ }
+ }
+ }
+
+ if
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
+ == BatchConfigProperties.SegmentPushType.METADATA) {
+ updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI,
segmentConversionResult,
+ segmentUriToTarPathMap, pushJobSpec);
+ } else {
+ pushSegment(taskConfigs, outputSegmentTarURI, httpHeaders,
parameters, segmentConversionResult);
+ if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
+ LOGGER.warn("Failed to delete tarred converted segment: {}",
convertedTarredSegmentFile.getAbsolutePath());
+ }
}
- List<NameValuePair> parameters =
Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter,
- tableTypeParameter);
+ }
- pushSegment(tableNameParameter.getValue(), configs,
outputSegmentTarURI, httpHeaders, parameters,
- segmentConversionResult);
- if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
- LOGGER.warn("Failed to delete tarred converted segment: {}",
convertedTarredSegmentFile.getAbsolutePath());
+ if (!segmentUriToTarPathMap.isEmpty()) {
+ // For metadata push, push all segments in batch mode
+ pushJobSpec.setBatchMode(true);
+ pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig,
segmentUriToTarPathMap, pushJobSpec, authProvider,
+ segmentConversionResults);
+ for (File convertedTarredSegmentFile: tarredSegmentFiles) {
+ if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
+ LOGGER.warn("Failed to delete tarred converted segment: {}",
convertedTarredSegmentFile.getAbsolutePath());
+ }
}
}
@@ -335,9 +347,8 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String outputSegmentNames =
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
.collect(Collectors.joining(","));
postProcess(pinotTaskConfig);
- LOGGER
- .info("Done executing {} on table: {}, input segments: {}, output
segments: {}", taskType, tableNameWithType,
- inputSegmentNames, outputSegmentNames);
+ LOGGER.info("Done executing {} on table: {}, input segments: {}, output
segments: {}", taskType,
+ tableNameWithType, inputSegmentNames, outputSegmentNames);
return segmentConversionResults;
} finally {
@@ -345,50 +356,107 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
}
}
- private void pushSegment(String tableName, Map<String, String> taskConfigs,
URI outputSegmentTarURI,
- List<Header> headers, List<NameValuePair> parameters,
SegmentConversionResult segmentConversionResult)
- throws Exception {
- String pushMode =
- taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.name());
- LOGGER.info("Trying to push Pinot segment with push mode {} from {}",
pushMode, outputSegmentTarURI);
+ private void updateSegmentUriToTarPathMap(Map<String, String> taskConfigs,
URI outputSegmentTarURI,
+ SegmentConversionResult segmentConversionResult, Map<String, String>
segmentUriToTarPathMap,
+ PushJobSpec pushJobSpec) {
+ String segmentName = segmentConversionResult.getSegmentName();
+ if
(!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+ throw new RuntimeException(String.format("Output dir URI missing for
metadata push while processing segment: %s",
+ segmentName));
+ }
+ URI outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+ Map<String, String> localSegmentUriToTarPathMap =
+ SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI,
pushJobSpec,
+ new String[]{outputSegmentTarURI.toString()});
+ if (!localSegmentUriToTarPathMap.isEmpty()) {
+ segmentUriToTarPathMap.putAll(localSegmentUriToTarPathMap);
+ }
+ }
+ private PushJobSpec getPushJobSpec(Map<String, String> taskConfigs) {
PushJobSpec pushJobSpec = new PushJobSpec();
pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+ return pushJobSpec;
+ }
- SegmentGenerationJobSpec spec = generatePushJobSpec(tableName,
taskConfigs, pushJobSpec);
-
- switch
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
- case TAR:
- File tarFile = new File(outputSegmentTarURI);
- String segmentName = segmentConversionResult.getSegmentName();
- String tableNameWithType =
segmentConversionResult.getTableNameWithType();
- String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
- SegmentConversionUtils.uploadSegment(taskConfigs, headers,
parameters, tableNameWithType, segmentName,
- uploadURL, tarFile);
- break;
- case METADATA:
- if
(taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
- URI outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
- try (PinotFS outputFileFS =
MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
- Map<String, String> segmentUriToTarPathMap =
-
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
- new String[]{outputSegmentTarURI.toString()});
- SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS,
segmentUriToTarPathMap, headers, parameters);
- }
- } else {
- throw new RuntimeException("Output dir URI missing for metadata
push");
- }
- break;
- default:
- throw new UnsupportedOperationException("Unrecognized push mode - " +
pushMode);
+ private List<Header> getSegmentPushCommonHeaders(PinotTaskConfig
pinotTaskConfig, AuthProvider authProvider,
+ List<SegmentConversionResult> segmentConversionResults) {
+ SegmentConversionResult segmentConversionResult;
+ if (segmentConversionResults.size() == 1) {
+ segmentConversionResult = segmentConversionResults.get(0);
+ } else {
+ // Setting to null as the base method expects a single object. This is
ok for now, since the
+ // segmentConversionResult is not made use of while generating the
customMap.
+ segmentConversionResult = null;
+ }
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
+ getSegmentZKMetadataCustomMapModifier(pinotTaskConfig,
segmentConversionResult);
+ Header segmentZKMetadataCustomMapModifierHeader =
+ new
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+ segmentZKMetadataCustomMapModifier.toJsonString());
+
+ List<Header> headers = new ArrayList<>();
+ headers.add(segmentZKMetadataCustomMapModifierHeader);
+ headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
+ return headers;
+ }
+
+ private List<NameValuePair> getSegmentPushCommonParams(String
tableNameWithType) {
+ List<NameValuePair> params = new ArrayList<>();
+ params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
+ "true"));
+ params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
+ TableNameBuilder.extractRawTableName(tableNameWithType)));
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != null) {
+ params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
tableType.toString()));
+ } else {
+ throw new RuntimeException(String.format("Failed to determine the
tableType from name: %s", tableNameWithType));
+ }
+ return params;
+ }
+
+ private void pushSegments(String tableNameWithType, Map<String, String>
taskConfigs, PinotTaskConfig pinotTaskConfig,
+ Map<String, String> segmentUriToTarPathMap, PushJobSpec pushJobSpec,
+ AuthProvider authProvider, List<SegmentConversionResult>
segmentConversionResults)
+ throws Exception {
+ String tableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ SegmentGenerationJobSpec spec =
generateSegmentGenerationJobSpec(tableName, taskConfigs, pushJobSpec);
+
+ List<Header> headers = getSegmentPushCommonHeaders(pinotTaskConfig,
authProvider, segmentConversionResults);
+ List<NameValuePair> parameters =
getSegmentPushCommonParams(tableNameWithType);
+
+ URI outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+ try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs,
outputSegmentDirURI)) {
+ SegmentPushUtils.sendSegmentsUriAndMetadata(spec, outputFileFS,
segmentUriToTarPathMap, headers, parameters);
+ }
+ }
+
+ private void pushSegment(Map<String, String> taskConfigs, URI
outputSegmentTarURI,
+ List<Header> headers, List<NameValuePair> parameters,
SegmentConversionResult segmentConversionResult)
+ throws Exception {
+ String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.TAR.name());
+ LOGGER.info("Trying to push Pinot segment with push mode {} from {}",
pushMode, outputSegmentTarURI);
+
+ if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
+ == BatchConfigProperties.SegmentPushType.TAR) {
+ File tarFile = new File(outputSegmentTarURI);
+ String segmentName = segmentConversionResult.getSegmentName();
+ String tableNameWithType =
segmentConversionResult.getTableNameWithType();
+ String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+ SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters,
tableNameWithType, segmentName, uploadURL,
+ tarFile);
+ } else {
+ throw new UnsupportedOperationException("Unrecognized push mode: " +
pushMode);
}
}
- private SegmentGenerationJobSpec generatePushJobSpec(String tableName,
Map<String, String> taskConfigs,
+ private SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String
tableName, Map<String, String> taskConfigs,
PushJobSpec pushJobSpec) {
TableSpec tableSpec = new TableSpec();
@@ -416,7 +484,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
if
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT))
&& outputFileFS.exists(
outputSegmentTarURI)) {
throw new RuntimeException(String.format("Output file: %s already
exists. "
- + "Set 'overwriteOutput' to true to ignore this error",
outputSegmentTarURI));
+ + "Set 'overwriteOutput' to true to ignore this error",
outputSegmentTarURI));
} else {
outputFileFS.copyFromLocalFile(localSegmentTarFile,
outputSegmentTarURI);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index 4a5dd21948..cc3f008771 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -359,6 +359,136 @@ public class SegmentPushUtils implements Serializable {
}
}
+ public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec,
PinotFS fileSystem,
+ Map<String, String> segmentUriToTarPathMap, List<Header> headers,
List<NameValuePair> parameters)
+ throws Exception {
+ String tableName = spec.getTableSpec().getTableName();
+ Map<String, File> segmentMetadataFileMap = new HashMap<>();
+ List<String> segmentURIs = new ArrayList<>();
+ LOGGER.info("Start pushing segment metadata: {} to locations: {} for table
{}", segmentUriToTarPathMap,
+ Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+ for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+ String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+ String fileName = new File(tarFilePath).getName();
+ // segments stored in Pinot deep store do not have .tar.gz extension
+ String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+ ? fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+ SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+ File segmentMetadataFile;
+ // Check if there is a segment metadata tar gz file named
`segmentName.metadata.tar.gz`, already in the remote
+ // directory. This is to avoid generating a new segment metadata tar gz
file every time we push a segment,
+ // which requires downloading the entire segment tar gz file.
+
+ URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath,
segmentName);
+ LOGGER.info("Checking if metadata tar gz file {} exists",
metadataTarGzFilePath);
+ if (spec.getPushJobSpec().isPreferMetadataTarGz() &&
fileSystem.exists(metadataTarGzFilePath)) {
+ segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+ "segmentMetadata-" + UUID.randomUUID() +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ if (segmentMetadataFile.exists()) {
+ FileUtils.forceDelete(segmentMetadataFile);
+ }
+ fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
+ } else {
+ segmentMetadataFile = generateSegmentMetadataFile(fileSystem,
URI.create(tarFilePath));
+ }
+ segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+ segmentURIs.add(segmentName);
+ segmentURIs.add(segmentUriPath);
+ }
+
+ // FIXME: Move this to a separate method
+ String uuid = UUID.randomUUID().toString();
+ File allSegmentsMetadataDir = new File(FileUtils.getTempDirectory(),
"allSegmentsMetadataDir-" + uuid);
+ FileUtils.forceMkdir(allSegmentsMetadataDir);
+ for (Map.Entry<String, File> segmentMetadataTarFileEntry:
segmentMetadataFileMap.entrySet()) {
+ String segmentName = segmentMetadataTarFileEntry.getKey();
+ File tarFile = segmentMetadataTarFileEntry.getValue();
+ TarGzCompressionUtils.untarOneFile(tarFile,
V1Constants.MetadataKeys.METADATA_FILE_NAME,
+ new File(allSegmentsMetadataDir, segmentName + "." +
V1Constants.MetadataKeys.METADATA_FILE_NAME));
+ TarGzCompressionUtils.untarOneFile(tarFile,
V1Constants.SEGMENT_CREATION_META,
+ new File(allSegmentsMetadataDir, segmentName + "." +
V1Constants.SEGMENT_CREATION_META));
+ }
+ File allSegmentsMetadataTarFile = new File(FileUtils.getTempDirectory(),
"allSegmentsMetadataTar-" + uuid
+ + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ if (allSegmentsMetadataTarFile.exists()) {
+ FileUtils.forceDelete(allSegmentsMetadataTarFile);
+ }
+ // Add a file which contains the download URI of all the segments
+ File segmentsURIFile = new File(allSegmentsMetadataDir,
"all_segments_metadata");
+ FileUtils.writeLines(segmentsURIFile, segmentURIs);
+
+ TarGzCompressionUtils.createTarGzFile(allSegmentsMetadataDir,
allSegmentsMetadataTarFile);
+ Map<String, File> allSegmentsMetadataMap = new HashMap<>();
+ allSegmentsMetadataMap.put("allSegments", allSegmentsMetadataTarFile);
+
+ // perform metadata push in batch mode for every cluster
+ try {
+ for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+ URI controllerURI;
+ try {
+ controllerURI = new URI(pinotClusterSpec.getControllerURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Got invalid controller uri - '" +
pinotClusterSpec.getControllerURI() + "'");
+ }
+ LOGGER.info("Pushing segments: {} to Pinot cluster: {} for table {}",
+ segmentMetadataFileMap.keySet(), controllerURI, tableName);
+ int attempts = 1;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushAttempts() > 0) {
+ attempts = spec.getPushJobSpec().getPushAttempts();
+ }
+ long retryWaitMs = 1000L;
+ if (spec.getPushJobSpec() != null &&
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+ retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+ }
+ RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs,
5).attempt(() -> {
+ List<Header> reqHttpHeaders = new ArrayList<>(headers);
+ try {
+ addHeaders(spec, reqHttpHeaders);
+ URI segmentUploadURI = getSegmentUploadURI(controllerURI);
+ SimpleHttpResponse response =
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI,
+ allSegmentsMetadataMap, reqHttpHeaders, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ LOGGER.info("Response for pushing table {} segments {} to location
{} - {}: {}", tableName,
+ segmentMetadataFileMap.keySet(), controllerURI,
response.getStatusCode(), response.getResponse());
+ return true;
+ } catch (HttpErrorStatusException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode >= 500) {
+ // Temporary exception
+ LOGGER.warn("Caught temporary exception while pushing table: {}
segments: {} to {}, will retry",
+ tableName, segmentMetadataFileMap.keySet(), controllerURI,
e);
+ return false;
+ } else {
+ // Permanent exception
+ LOGGER.error("Caught permanent exception while pushing table: {}
segments: {} to {}, won't retry",
+ tableName, segmentMetadataFileMap.keySet(), controllerURI,
e);
+ throw e;
+ }
+ }
+ });
+ }
+ } finally {
+ for (Map.Entry<String, File> metadataFileEntry:
segmentMetadataFileMap.entrySet()) {
+ FileUtils.deleteQuietly(metadataFileEntry.getValue());
+ }
+ FileUtils.deleteDirectory(allSegmentsMetadataDir);
+ FileUtils.forceDelete(allSegmentsMetadataTarFile);
+ }
+ }
+
+ private static URI getSegmentUploadURI(URI controllerURI)
+ throws URISyntaxException {
+ return FileUploadDownloadClient.getUploadSegmentBatchURI(controllerURI);
+ }
+
+ private static void addHeaders(SegmentGenerationJobSpec jobSpec,
List<Header> headers) {
+ headers.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+ FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+ if (jobSpec.getPushJobSpec() != null) {
+ headers.add(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE,
+
String.valueOf(jobSpec.getPushJobSpec().getCopyToDeepStoreForMetadataPush())));
+ }
+ }
+
public static Map<String, String> getSegmentUriToTarPathMap(URI
outputDirURI, PushJobSpec pushSpec,
String[] files) {
Map<String, String> segmentUriToTarPathMap = new HashMap<>();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
index 31d1ce8448..2b9237e5fd 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
@@ -46,6 +46,13 @@ public class PushJobSpec implements Serializable {
* If true, and if segment was not already in the deep store, move it to
deep store.
*/
private boolean _copyToDeepStoreForMetadataPush;
+
+ /**
+ * Applicable for METADATA push type.
+ * If true, multiple segment metadata files are uploaded to the controller
in a single call.
+ */
+ private boolean _batchMode;
+
/**
* Used in SegmentUriPushJobRunner, which is used to composite the segment
uri to send to pinot controller.
* The URI sends to controller is in the format
${segmentUriPrefix}${segmentPath}${segmentUriSuffix}
@@ -148,4 +155,12 @@ public class PushJobSpec implements Serializable {
public void setCopyToDeepStoreForMetadataPush(boolean
copyToDeepStoreForMetadataPush) {
_copyToDeepStoreForMetadataPush = copyToDeepStoreForMetadataPush;
}
+
+ public boolean isBatchMode() {
+ return _batchMode;
+ }
+
+ public void setBatchMode(boolean batchMode) {
+ _batchMode = batchMode;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]