This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2a70bb9 Pinot controller side change to enhance LLC segment metadata
upload. (#3877)
2a70bb9 is described below
commit 2a70bb913f006d52a55aa686a86464b9acf4473f
Author: Ting Chen <[email protected]>
AuthorDate: Wed Mar 13 15:24:17 2019 -0700
Pinot controller side change to enhance LLC segment metadata upload. (#3877)
* Pinot controller side change to enhance LLC segment metadata upload.
* Remove an unused lib.
* Revise based on reviews.
* Let the LLCSegmentCompletionHandlers handle the metadata extraction.
* Clean up error case behaviors for metadata extraction based on review.
* Add more logging for error cases based on review feedback.
* Fix integration failure by passing the right segment location in
controller for metadata extraction.
* Use PinotFS to copy segment files for metadata extraction.
* Construct segment file URI directly.
* Further revision to address reviews.
* (1)Remove the prefix java.net. (2) Add test to show the diff between
Apache URI and java.net.URI.
* Fix a comment typo.
* Revise logging and lib inclusion based on reviews.
* Return failure directly if there is no metadata file in the input form.
* Refector the upload function based on Subu's comments.
* Fix the log error level.
* Apply Subba's refactoring and a few of my twigs.
* Address reviewer feedbacks about format and error logs.
* Fix an compilation error.
* Fix style issues and eliminate a local method.
* Fix redundant line.
---
.../protocols/SegmentCompletionProtocol.java | 1 +
.../resources/LLCSegmentCompletionHandlers.java | 364 ++++++++++++++++-----
.../realtime/PinotLLCRealtimeSegmentManager.java | 69 +---
.../core/realtime/SegmentCompletionManager.java | 13 +-
.../segment/CommittingSegmentDescriptor.java | 18 +
.../PinotLLCRealtimeSegmentManagerTest.java | 61 ++--
.../helix/core/realtime/SegmentCompletionTest.java | 30 +-
7 files changed, 382 insertions(+), 174 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index cedc8d6..60ee1e0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -112,6 +112,7 @@ public class SegmentCompletionProtocol {
public static final String MSG_TYPE_COMMIT_START = "segmentCommitStart";
public static final String MSG_TYPE_SEGMENT_UPLOAD = "segmentUpload";
public static final String MSG_TYPE_COMMIT_END = "segmentCommitEnd";
+ public static final String MSG_TYPE_COMMIT_END_METADATA =
"segmentCommitEndWithMetadata";
public static final String MSG_TYPE_STOPPED_CONSUMING =
"segmentStoppedConsuming";
public static final String MSG_TYPE_EXTEND_BUILD_TIME = "extendBuildTime";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 0d5de21..19e2123 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -20,14 +20,18 @@ package org.apache.pinot.controller.api.resources;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
@@ -36,15 +40,20 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
-import org.apache.commons.httpclient.URI;
+
+import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
+import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.filesystem.PinotFS;
import org.apache.pinot.filesystem.PinotFSFactory;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
@@ -58,8 +67,10 @@ import org.slf4j.LoggerFactory;
@Path("/")
public class LLCSegmentCompletionHandlers {
+ private static final String SEGMENT_TMP_DIR = "segment.tmp";
private static Logger LOGGER =
LoggerFactory.getLogger(LLCSegmentCompletionHandlers.class);
private static final String SCHEME = "file://";
+ private static final String METADATA_TEMP_DIR_SUFFIX = ".metadata.tmp";
@Inject
ControllerConf _controllerConf;
@@ -197,6 +208,16 @@ public class LLCSegmentCompletionHandlers {
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
+ SegmentMetadataImpl segmentMetadata;
+ try {
+ segmentMetadata = extractMetadataFromSegmentFile(segmentName, new
URI(segmentLocation));
+ } catch (URISyntaxException e) {
+ LOGGER.error("Invalid segment location: {} for segment {}",
segmentLocation, segmentName);
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+ if (segmentMetadata == null) {
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
@@ -207,8 +228,10 @@ public class LLCSegmentCompletionHandlers {
final boolean isSuccess = true;
final boolean isSplitCommit = true;
- SegmentCompletionProtocol.Response response =
- SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams,
isSuccess, isSplitCommit);
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
segmentMetadata);
+ SegmentCompletionProtocol.Response response =
SegmentCompletionManager.getInstance()
+ .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
committingSegmentDescriptor);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
return responseStr;
@@ -234,13 +257,68 @@ public class LLCSegmentCompletionHandlers {
final SegmentCompletionManager segmentCompletionManager =
SegmentCompletionManager.getInstance();
SegmentCompletionProtocol.Response response =
segmentCompletionManager.segmentCommitStart(requestParams);
- if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
- // Get the segment and put it in the right place.
- boolean success = uploadSegment(multiPart, instanceId, segmentName,
false) != null;
- response = segmentCompletionManager.segmentCommitEnd(requestParams,
success, false);
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(requestParams);
+ boolean success = false;
+
+ if (response.equals(SegmentCompletionProtocol.RESP_COMMIT_CONTINUE)) {
+ File localTmpFile = null;
+ try {
+ // Get the segment from the form input and put it in a tmp area in the
local file system.
+ localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId,
segmentName);
+ if (localTmpFile == null) {
+ LOGGER.error("Unable to get the segment file from multipart input to
local file {}", segmentName);
+ } else {
+ // Extract the segment metadata from the segment file.
+ SegmentMetadataImpl segmentMetadata =
+ getSegmentMetadataFromLocalFile(new LLCSegmentName(segmentName),
localTmpFile);
+ if (segmentMetadata == null) {
+ LOGGER.error("Unable to extract segment metadata from segment
data: {}", segmentName);
+ } else {
+ // Store the segment file to Pinot FS.
+ try {
+ FileUploadPathProvider provider = new
FileUploadPathProvider(_controllerConf);
+ final String rawTableName = new
LLCSegmentName(segmentName).getTableName();
+ URI segmentFileURI = ControllerConf.getUriFromPath(
+ StringUtil.join("/",
provider.getBaseDataDirURI().toString(), rawTableName, segmentName));
+ PinotFS pinotFS =
PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+ // Multiple threads can reach this point at the same time, if
the following scenario happens
+ // The server that was asked to commit did so very slowly (due
to network speeds). Meanwhile the FSM in
+ // SegmentCompletionManager timed out, and allowed another
server to commit, which did so very quickly (somehow
+ // the network speeds changed). The second server made it
through the FSM and reached this point.
+ // The synchronization below takes care that exactly one file
gets moved in place.
+ // There are still corner conditions that are not handled
correctly. For example,
+ // 1. What if the offset of the faster server was different?
+ // 2. We know that only the faster server will get to complete
the COMMIT call successfully. But it is possible
+ // that the race to this statement is won by the slower
server, and so the real segment that is in there is that
+ // of the slower server.
+ // In order to overcome controller restarts after the segment is
moved to PinotFS, but before it is committed, we DO need to
+ // check for existing segment file and remove it. So, the block
cannot be removed altogether.
+ // For now, we live with these corner cases. Once we have
split-commit enabled and working, this code will no longer
+ // be used.
+ synchronized (SegmentCompletionManager.getInstance()) {
+ if (pinotFS.exists(segmentFileURI)) {
+ LOGGER.warn("Segment file {} exists. Replacing with upload
from {} for segment {}",
+ segmentFileURI.toString(), instanceId, segmentName);
+ pinotFS.delete(segmentFileURI, true);
+ }
+ pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+ }
+ committingSegmentDescriptor =
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
segmentMetadata);
+ success = true;
+ } catch (Exception e) {
+ LOGGER.error("Could not save segment {} to PinotFS",
segmentName, e);
+ }
+ }
+ }
+ } finally {
+ FileUtils.deleteQuietly(localTmpFile);
+ }
}
+ response = segmentCompletionManager.segmentCommitEnd(requestParams,
success, false, committingSegmentDescriptor);
LOGGER.info("Response to segmentCommit: instance={} segment={} status={}
offset={}", requestParams.getInstanceId(),
requestParams.getSegmentName(), response.getStatus(),
response.getOffset());
@@ -260,25 +338,202 @@ public class LLCSegmentCompletionHandlers {
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
- final String segmentLocation = uploadSegment(multiPart, instanceId,
segmentName, true);
- if (segmentLocation == null) {
+ // Get the segment from the form input and put it in the right place.
+ File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId,
segmentName);
+ if (localTmpFile == null) {
+ LOGGER.error("Unable to get the segment file from multipart input to
local file {}", segmentName);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Response.Params responseParams =
- new
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
- .withSegmentLocation(segmentLocation)
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+ try {
+ FileUploadPathProvider provider = new
FileUploadPathProvider(_controllerConf);
+ URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile,
segmentName);
+ if (uri == null) {
+ LOGGER.error("Unable to upload local segment file {} to Pinot storage
for segment ", localTmpFile.toPath(),
+ segmentName);
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+ SegmentCompletionProtocol.Response.Params responseParams =
+ new
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+ .withSegmentLocation(uri.toString())
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+ String response = new
SegmentCompletionProtocol.Response(responseParams).toJsonString();
- String response = new
SegmentCompletionProtocol.Response(responseParams).toJsonString();
+ LOGGER.info("Response to segmentUpload:{}", response);
- LOGGER.info("Response to segmentUpload:{}", response);
+ return response;
+ } catch (Exception e) {
- return response;
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ } finally {
+ FileUtils.deleteQuietly(localTmpFile);
+ }
}
- @Nullable
- private String uploadSegment(FormDataMultiPart multiPart, String instanceId,
String segmentName,
- boolean isSplitCommit) {
+ @POST
+ @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public String
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
String instanceId,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String
segmentLocation,
+ @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long
memoryUsedBytes,
+ @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long
buildTimeMillis,
+ @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long
waitTimeMillis,
+ @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+ if (instanceId == null || segmentName == null || offset == -1 ||
segmentLocation == null || metadataFiles == null) {
+ LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={},
segmentLocation={}", offset, segmentName,
+ instanceId, segmentLocation);
+ // TODO: memoryUsedInBytes = 0 if not present in params. Add validation
when we start using it
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
+
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+ .withMemoryUsedBytes(memoryUsedBytes);
+ LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+ final boolean isSuccess = true;
+ final boolean isSplitCommit = true;
+ SegmentMetadataImpl segmentMetadata =
extractMetadataFromInput(metadataFiles, segmentName);
+ // If it fails to extract metadata from the input form, return failure.
+ if (segmentMetadata == null) {
+ LOGGER.error("Segment metadata extraction failure for segment {}",
segmentName);
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+ SegmentCompletionProtocol.Response response =
SegmentCompletionManager.getInstance()
+ .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
segmentMetadata));
+ final String responseStr = response.toJsonString();
+ LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+ return responseStr;
+ }
+
+ /**
+ * Extract and return the segment metadata from the two input form data
files (metadata file and creation meta).
+ * Return null if any of the two files is missing or there is exception
during parsing and extraction.
+ */
+ private SegmentMetadataImpl extractMetadataFromInput(FormDataMultiPart
metadataFiles, String segmentNameStr) {
+ String tempMetadataDirStr = StringUtil.join("/",
_controllerConf.getLocalTempDir(),
+ segmentNameStr + METADATA_TEMP_DIR_SUFFIX +
String.valueOf(System.currentTimeMillis()));
+ File tempMetadataDir = new File(tempMetadataDirStr);
+ try {
+ Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create
directory: %s", tempMetadataDirStr);
+ // Extract metadata.properties from the metadataFiles.
+ if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr,
+ V1Constants.MetadataKeys.METADATA_FILE_NAME, segmentNameStr)) {
+ return null;
+ }
+ // Extract creation.meta from the metadataFiles.
+ if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr,
V1Constants.SEGMENT_CREATION_META,
+ segmentNameStr)) {
+ return null;
+ }
+ // Load segment metadata
+ return new SegmentMetadataImpl(tempMetadataDir);
+ } catch (Exception e) {
+ LOGGER.error("Exception extracting and reading segment metadata for {}",
segmentNameStr, e);
+ return null;
+ } finally {
+ FileUtils.deleteQuietly(tempMetadataDir);
+ }
+ }
+
+ /**
+ *
+ * Extract a single file with name metaFileName from the input
FormDataMultiPart and put it under the path
+ * tempMetadataDirStr + metaFileName.
+ * Return true iff the extraction and copy is successful.
+ */
+ private boolean extractMetadataFromInputField(FormDataMultiPart
metadataFiles, String tempMetadataDirStr,
+ String metaFileName, String segmentName) {
+ FormDataBodyPart metadataFilesField = metadataFiles.getField(metaFileName);
+ Preconditions.checkNotNull(metadataFilesField, "The metadata input field
%s does not exist.", metaFileName);
+
+ try (InputStream metadataPropertiesInputStream =
metadataFilesField.getValueAs(InputStream.class)) {
+ Preconditions.checkNotNull(metadataPropertiesInputStream, "Unable to
parse %s from input.", metaFileName);
+ java.nio.file.Path metadataPropertiesPath =
FileSystems.getDefault().getPath(tempMetadataDirStr, metaFileName);
+ Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Failed to extract metadata property file: {} for segment
{}", metaFileName, segmentName, e);
+ }
+ return false;
+ }
+
+ /**
+ * Extract metadata from a segment found in a URI (i.e., segmentLocation) in
PinotFS.
+ * <p>We extract the metadata.properties and creation.meta into a temporary
metadata directory:
+ * DATADIR/rawTableName/segmentName.metadata.tmp, and load metadata from
there.
+ *
+ * @param segmentNameStr Name of the segment
+ * @param segmentLocation the location of the segment file in PinotFS.
+ * @return SegmentMetadataImpl if it is able to extract the metadata file
from the tar-zipped segment file.
+ */
+ private SegmentMetadataImpl extractMetadataFromSegmentFile(final String
segmentNameStr, final URI segmentLocation) {
+ LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+ String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(),
segmentName.getTableName());
+ String tempSegmentDataDirStr =
+ StringUtil.join("/", baseDirStr, segmentNameStr + SEGMENT_TMP_DIR +
String.valueOf(System.currentTimeMillis()));
+ File tempSegmentDataDir = new File(tempSegmentDataDirStr);
+ File segDstFile = new File(StringUtil.join("/", tempSegmentDataDirStr,
segmentNameStr));
+ // Use PinotFS to copy the segment file to local fs for metadata
extraction.
+ PinotFS pinotFS =
PinotFSFactory.create(ControllerConf.getUriFromPath(_controllerConf.getDataDir()).getScheme());
+ try {
+ Preconditions.checkState(tempSegmentDataDir.mkdirs(), "Failed to create
directory: %s", tempSegmentDataDir);
+ pinotFS.copyToLocalFile(segmentLocation, segDstFile);
+ return getSegmentMetadataFromLocalFile(segmentName, segDstFile);
+ } catch (Exception e) {
+ LOGGER.error("Exception in extracting segment file to local {}",
segmentNameStr, e);
+ return null;
+ } finally {
+ FileUtils.deleteQuietly(tempSegmentDataDir);
+ }
+ }
+
+ private SegmentMetadataImpl getSegmentMetadataFromLocalFile(LLCSegmentName
segmentName, File segmentFile) {
+ String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(),
segmentName.getTableName());
+ String tempMetadataDirStr = StringUtil.join("/", baseDirStr,
+ segmentName.getSegmentName() + METADATA_TEMP_DIR_SUFFIX +
String.valueOf(System.currentTimeMillis()));
+ File tempMetadataDir = new File(tempMetadataDirStr);
+ try (// Extract metadata.properties
+ InputStream metadataPropertiesInputStream = TarGzCompressionUtils
+ .unTarOneFile(new FileInputStream(segmentFile),
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ // Extract creation.meta
+ InputStream creationMetaInputStream = TarGzCompressionUtils
+ .unTarOneFile(new FileInputStream(segmentFile),
V1Constants.SEGMENT_CREATION_META)) {
+ Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create
directory: %s", tempMetadataDirStr);
+ Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not
exist",
+ V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ java.nio.file.Path metadataPropertiesPath =
+ FileSystems.getDefault().getPath(tempMetadataDirStr,
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+ Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+
+ Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist",
V1Constants.SEGMENT_CREATION_META);
+ java.nio.file.Path creationMetaPath =
+ FileSystems.getDefault().getPath(tempMetadataDirStr,
V1Constants.SEGMENT_CREATION_META);
+ Files.copy(creationMetaInputStream, creationMetaPath);
+ // Load segment metadata
+ return new SegmentMetadataImpl(tempMetadataDir);
+ } catch (Exception e) {
+ LOGGER.error("Exception extracting and reading segment metadata for {}",
segmentName.getSegmentName(), e);
+ return null;
+ } finally {
+ FileUtils.deleteQuietly(tempMetadataDir);
+ }
+ }
+
+ /**
+ *
+ * Copy the uploaded segment file in the input form to a local tmp file and
return the tmp file.
+ * Return null when there is any error during the process.
+ */
+ private File uploadFileToLocalTmpFile(FormDataMultiPart multiPart, String
instanceId, String segmentName) {
try {
Map<String, List<FormDataBodyPart>> map = multiPart.getFields();
if (!PinotSegmentUploadRestletResource.validateMultiPart(map,
segmentName)) {
@@ -297,59 +552,7 @@ public class LLCSegmentCompletionHandlers {
OutputStream outputStream = new FileOutputStream(localTmpFile)) {
IOUtils.copyLarge(inputStream, outputStream);
}
-
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- final String rawTableName = llcSegmentName.getTableName();
- final java.net.URI tableDirURI =
- ControllerConf.getUriFromPath(StringUtil.join("/",
provider.getBaseDataDirURI().toString(), rawTableName));
- java.net.URI segmentFileURI;
- if (isSplitCommit) {
- // We only clean up tmp segment file under table dir, so don't create
any sub-dir under table dir.
- // See PinotLLCRealtimeSegmentManager.commitSegmentFile().
- // TODO: move tmp file logic into SegmentCompletionUtils.
- String uniqueSegmentFileName =
SegmentCompletionUtils.generateSegmentFileName(segmentName);
- segmentFileURI =
- ControllerConf.getUriFromPath(StringUtil.join("/",
tableDirURI.toString(), uniqueSegmentFileName));
- } else {
- segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/",
tableDirURI.toString(), segmentName));
- }
-
- PinotFS pinotFS =
PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
- try {
- if (isSplitCommit) {
- pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
- } else {
- // Multiple threads can reach this point at the same time, if the
following scenario happens
- // The server that was asked to commit did so very slowly (due to
network speeds). Meanwhile the FSM in
- // SegmentCompletionManager timed out, and allowed another server to
commit, which did so very quickly (somehow
- // the network speeds changed). The second server made it through
the FSM and reached this point.
- // The synchronization below takes care that exactly one file gets
moved in place.
- // There are still corner conditions that are not handled correctly.
For example,
- // 1. What if the offset of the faster server was different?
- // 2. We know that only the faster server will get to complete the
COMMIT call successfully. But it is possible
- // that the race to this statement is won by the slower server,
and so the real segment that is in there is that
- // of the slower server.
- // In order to overcome controller restarts after the segment is
renamed, but before it is committed, we DO need to
- // check for existing segment file and remove it. So, the block
cannot be removed altogether.
- // For now, we live with these corner cases. Once we have
split-commit enabled and working, this code will no longer
- // be used.
- synchronized (SegmentCompletionManager.getInstance()) {
- if (pinotFS.exists(segmentFileURI)) {
- LOGGER
- .warn("Segment file {} exists. Replacing with upload from
{}", segmentFileURI.toString(), instanceId);
- pinotFS.delete(segmentFileURI, true);
- }
- pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
- }
- }
- } catch (Exception e) {
- LOGGER.error("Could not copy from {} to {}",
localTmpFile.getAbsolutePath(), segmentFileURI.toString());
- } finally {
- FileUtils.deleteQuietly(localTmpFile);
- }
-
- LOGGER.info("Moved file {} to {}", localTmpFile.getAbsolutePath(),
segmentFileURI.toString());
- return new URI(SCHEME + segmentFileURI.toString(), /* boolean escaped */
false).toString();
+ return localTmpFile;
} catch (InvalidControllerConfigException e) {
LOGGER.error("Invalid controller config exception from instance {} for
segment {}", instanceId, segmentName, e);
return null;
@@ -360,4 +563,19 @@ public class LLCSegmentCompletionHandlers {
multiPart.cleanup();
}
}
+
+ private URI localSegementFileToPinotFsTmpLocation(FileUploadPathProvider
provider, File localTmpFile,
+ String segmentName)
+ throws Exception {
+ final String rawTableName = new LLCSegmentName(segmentName).getTableName();
+ // We only clean up tmp segment file under table dir, so don't create any
sub-dir under table dir.
+ // See PinotLLCRealtimeSegmentManager.commitSegmentFile().
+ // TODO: move tmp file logic into SegmentCompletionUtils.
+ String uniqueSegmentFileName =
SegmentCompletionUtils.generateSegmentFileName(segmentName);
+ URI segmentFileURI = ControllerConf.getUriFromPath(
+ StringUtil.join("/", provider.getBaseDataDirURI().toString(),
rawTableName, uniqueSegmentFileName));
+ PinotFS pinotFS =
PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+ pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+ return segmentFileURI;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 82d2e31..7a883ca 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -23,13 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
import java.net.URI;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -45,7 +39,6 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
@@ -70,7 +63,6 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.controller.ControllerConf;
@@ -89,7 +81,6 @@ import org.apache.pinot.core.realtime.stream.OffsetCriteria;
import org.apache.pinot.core.realtime.stream.PartitionOffsetFetcher;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
-import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.ColumnMetadata;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.filesystem.PinotFS;
@@ -486,7 +477,8 @@ public class PinotLLCRealtimeSegmentManager {
}
// Step-1
- boolean success = updateOldSegmentMetadataZNRecord(realtimeTableName,
committingLLCSegmentName, nextOffset);
+ boolean success = updateOldSegmentMetadataZNRecord(realtimeTableName,
committingLLCSegmentName, nextOffset,
+ committingSegmentDescriptor);
if (!success) {
return false;
}
@@ -532,10 +524,12 @@ public class PinotLLCRealtimeSegmentManager {
* @param realtimeTableName - table name for which segment is being committed
* @param committingLLCSegmentName - name of the segment being committed
* @param nextOffset - the end offset for this committing segment
+ * @param committingSegmentDescriptor - the metadata of the commit segment.
* @return
*/
protected boolean updateOldSegmentMetadataZNRecord(String realtimeTableName,
LLCSegmentName committingLLCSegmentName,
- long nextOffset) {
+ long nextOffset,
+
CommittingSegmentDescriptor committingSegmentDescriptor) {
String committingSegmentNameStr =
committingLLCSegmentName.getSegmentName();
Stat stat = new Stat();
@@ -547,6 +541,12 @@ public class PinotLLCRealtimeSegmentManager {
committingSegmentNameStr, realtimeTableName,
committingSegmentMetadata.getStatus());
return false;
}
+ if (committingSegmentDescriptor.getSegmentMetadata() == null) {
+ LOGGER.error("No segment metadata found in descriptor for committing
segment {} for table {}", committingLLCSegmentName,
+ realtimeTableName);
+ return false;
+ }
+ SegmentMetadataImpl segmentMetadata =
committingSegmentDescriptor.getSegmentMetadata();
// TODO: set number of rows to end consumption in new segment metadata,
based on memory used and number of rows from old segment
committingSegmentMetadata.setEndOffset(nextOffset);
@@ -554,8 +554,6 @@ public class PinotLLCRealtimeSegmentManager {
String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
committingSegmentMetadata.setDownloadUrl(
ControllerConf.constructDownloadUrl(rawTableName,
committingSegmentNameStr, _controllerConf.generateVipUrl()));
- // Pull segment metadata from incoming segment and set it in zk segment
metadata
- SegmentMetadataImpl segmentMetadata = extractSegmentMetadata(rawTableName,
committingSegmentNameStr);
committingSegmentMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()));
committingSegmentMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
committingSegmentMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
@@ -683,51 +681,6 @@ public class PinotLLCRealtimeSegmentManager {
return commitTimeoutMS;
}
- /**
- * Extract the segment metadata files from the tar-zipped segment file that
is expected to be in the directory for the
- * table.
- * <p>Segment tar-zipped file path: DATADIR/rawTableName/segmentName.
- * <p>We extract the metadata.properties and creation.meta into a temporary
metadata directory:
- * DATADIR/rawTableName/segmentName.metadata.tmp, and load metadata from
there.
- *
- * @param rawTableName Name of the table (not including the REALTIME
extension)
- * @param segmentNameStr Name of the segment
- * @return SegmentMetadataImpl if it is able to extract the metadata file
from the tar-zipped segment file.
- */
- protected SegmentMetadataImpl extractSegmentMetadata(final String
rawTableName, final String segmentNameStr) {
- String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(),
rawTableName);
- String segFileStr = StringUtil.join("/", baseDirStr, segmentNameStr);
- String tempMetadataDirStr = StringUtil.join("/", baseDirStr,
segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
- File tempMetadataDir = new File(tempMetadataDirStr);
-
- try {
- Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create
directory: %s", tempMetadataDirStr);
-
- // Extract metadata.properties
- InputStream metadataPropertiesInputStream = TarGzCompressionUtils
- .unTarOneFile(new FileInputStream(new File(segFileStr)),
V1Constants.MetadataKeys.METADATA_FILE_NAME);
- Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not
exist",
- V1Constants.MetadataKeys.METADATA_FILE_NAME);
- Path metadataPropertiesPath =
- FileSystems.getDefault().getPath(tempMetadataDirStr,
V1Constants.MetadataKeys.METADATA_FILE_NAME);
- Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
-
- // Extract creation.meta
- InputStream creationMetaInputStream = TarGzCompressionUtils
- .unTarOneFile(new FileInputStream(new File(segFileStr)),
V1Constants.SEGMENT_CREATION_META);
- Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist",
V1Constants.SEGMENT_CREATION_META);
- Path creationMetaPath =
FileSystems.getDefault().getPath(tempMetadataDirStr,
V1Constants.SEGMENT_CREATION_META);
- Files.copy(creationMetaInputStream, creationMetaPath);
-
- // Load segment metadata
- return new SegmentMetadataImpl(tempMetadataDir);
- } catch (Exception e) {
- throw new RuntimeException("Exception extracting and reading segment
metadata for " + segmentNameStr, e);
- } finally {
- FileUtils.deleteQuietly(tempMetadataDir);
- }
- }
-
public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String
realtimeTableName, String segmentName,
Stat stat) {
ZNRecord znRecord = _propertyStore
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index e3e3003..b499e3e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -290,7 +290,7 @@ public class SegmentCompletionManager {
* @return
*/
public SegmentCompletionProtocol.Response
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
- boolean success, boolean isSplitCommit) {
+ boolean success, boolean isSplitCommit, CommittingSegmentDescriptor
committingSegmentDescriptor) {
if (!isLeader() || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER,
1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
@@ -301,7 +301,7 @@ public class SegmentCompletionManager {
SegmentCompletionProtocol.Response response =
SegmentCompletionProtocol.RESP_FAILED;
try {
fsm = lookupOrCreateFsm(segmentName,
SegmentCompletionProtocol.MSG_TYPE_COMMIT);
- response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit);
+ response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit,
committingSegmentDescriptor);
} catch (Exception e) {
LOGGER.error("Caught exception in segmentCommitEnd for segment {}",
segmentNameStr, e);
}
@@ -605,7 +605,7 @@ public class SegmentCompletionManager {
* the _winner.
*/
public SegmentCompletionProtocol.Response
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
- boolean success, boolean isSplitCommit) {
+ boolean success, boolean isSplitCommit, CommittingSegmentDescriptor
committingSegmentDescriptor) {
String instanceId = reqParams.getInstanceId();
long offset = reqParams.getOffset();
synchronized (this) {
@@ -624,7 +624,7 @@ public class SegmentCompletionManager {
LOGGER.error("Segment upload failed");
return abortAndReturnFailed();
}
- SegmentCompletionProtocol.Response response = commitSegment(reqParams,
isSplitCommit);
+ SegmentCompletionProtocol.Response response = commitSegment(reqParams,
isSplitCommit, committingSegmentDescriptor);
if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
return abortAndReturnFailed();
} else {
@@ -1005,7 +1005,8 @@ public class SegmentCompletionManager {
}
private SegmentCompletionProtocol.Response
commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
- boolean isSplitCommit) {
+ boolean
isSplitCommit,
+
CommittingSegmentDescriptor committingSegmentDescriptor) {
boolean success;
String instanceId = reqParams.getInstanceId();
long offset = reqParams.getOffset();
@@ -1019,8 +1020,6 @@ public class SegmentCompletionManager {
_state = State.COMMITTING;
// In case of splitCommit, the segment is uploaded to a unique file name
indicated by segmentLocation,
// so we need to move the segment file to its permanent location first
before committing the metadata.
- CommittingSegmentDescriptor committingSegmentDescriptor =
-
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
if (isSplitCommit) {
if (!_segmentManager.commitSegmentFile(_segmentName.getTableName(),
committingSegmentDescriptor)) {
return SegmentCompletionProtocol.RESP_FAILED;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
index 82304d3..50c4e3f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.realtime.segment;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
/**
@@ -29,6 +30,7 @@ public class CommittingSegmentDescriptor {
private long _segmentSizeBytes;
private String _segmentLocation;
private long _nextOffset;
+ private SegmentMetadataImpl _segmentMetadata;
public static CommittingSegmentDescriptor fromSegmentCompletionReqParams(
SegmentCompletionProtocol.Request.Params reqParams) {
@@ -39,6 +41,14 @@ public class CommittingSegmentDescriptor {
return committingSegmentDescriptor;
}
+ public static CommittingSegmentDescriptor
fromSegmentCompletionReqParamsAndMetadata(
+ SegmentCompletionProtocol.Request.Params reqParams,
SegmentMetadataImpl metadata) {
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+ fromSegmentCompletionReqParams(reqParams);
+ committingSegmentDescriptor.setSegmentMetadata(metadata);
+ return committingSegmentDescriptor;
+ }
+
public CommittingSegmentDescriptor(String segmentName, long nextOffset, long
segmentSizeBytes) {
_segmentName = segmentName;
_nextOffset = nextOffset;
@@ -82,4 +92,12 @@ public class CommittingSegmentDescriptor {
public void setNextOffset(long nextOffset) {
_nextOffset = nextOffset;
}
+
+ public SegmentMetadataImpl getSegmentMetadata() {
+ return _segmentMetadata;
+ }
+
+ public void setSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
+ _segmentMetadata = segmentMetadata;
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 1e68bbc..8153559 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -347,11 +347,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
String tableName = tableConfig.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- segmentManager.updateOldSegmentMetadataZNRecord(tableName, llcSegmentName,
nextOffset);
LLCSegmentName newLlcSegmentName =
new LLCSegmentName(rawTableName, partition, nextSeqNum,
System.currentTimeMillis());
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(segmentName, nextOffset, 0);
+ segmentManager.updateOldSegmentMetadataZNRecord(tableName, llcSegmentName,
nextOffset, committingSegmentDescriptor);
segmentManager.createNewSegmentMetadataZNRecord(tableConfig,
llcSegmentName, newLlcSegmentName, partitionAssignment,
committingSegmentDescriptor, false);
segmentManager.updateIdealStateOnSegmentCompletion(idealState,
segmentName, newLlcSegmentName.getSegmentName(),
@@ -552,13 +552,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
LLCSegmentName latestSegment =
partitionToLatestSegments.get(String.valueOf(randomlySelectedPartition));
LLCRealtimeSegmentZKMetadata latestMetadata =
segmentManager.getRealtimeSegmentZKMetadata(tableName,
latestSegment.getSegmentName(), null);
- segmentManager
- .updateOldSegmentMetadataZNRecord(tableName, latestSegment,
latestMetadata.getStartOffset() + 100);
LLCSegmentName newLlcSegmentName =
new LLCSegmentName(rawTableName, randomlySelectedPartition,
latestSegment.getSequenceNumber() + 1,
System.currentTimeMillis());
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegment.getSegmentName(),
latestMetadata.getStartOffset() + 100, 0);
+ segmentManager.updateOldSegmentMetadataZNRecord(tableName,
latestSegment,
+ latestMetadata.getStartOffset() + 100,
committingSegmentDescriptor);
segmentManager.createNewSegmentMetadataZNRecord(tableConfig,
latestSegment, newLlcSegmentName,
expectedPartitionAssignment, committingSegmentDescriptor, false);
@@ -632,7 +632,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
LLCRealtimeSegmentZKMetadata latestMetadata =
segmentManager.getRealtimeSegmentZKMetadata(tableName,
latestSegment.getSegmentName(), null);
segmentManager
- .updateOldSegmentMetadataZNRecord(tableName, latestSegment,
latestMetadata.getStartOffset() + 100);
+ .updateOldSegmentMetadataZNRecord(tableName, latestSegment,
latestMetadata.getStartOffset() + 100,
+ new
CommittingSegmentDescriptor(latestSegment.getSegmentName(),
latestMetadata.getStartOffset() + 100, 0));
idealState =
idealStateBuilder.setSegmentState(latestSegment.getSegmentName(),
"ONLINE").build();
// get old state
@@ -666,7 +667,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
LLCRealtimeSegmentZKMetadata latestMetadata =
segmentManager.getRealtimeSegmentZKMetadata(tableName,
latestSegment.getSegmentName(), null);
segmentManager
- .updateOldSegmentMetadataZNRecord(tableName, latestSegment,
latestMetadata.getStartOffset() + 100);
+ .updateOldSegmentMetadataZNRecord(tableName, latestSegment,
latestMetadata.getStartOffset() + 100,
+ new
CommittingSegmentDescriptor(latestSegment.getSegmentName(),
latestMetadata.getStartOffset() + 100, 0));
// get old state
nPartitions = expectedPartitionAssignment.getNumPartitions();
@@ -894,7 +896,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager.IS_CONNECTED = false;
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.newMockSegmentMetadata());
boolean status = segmentManager.commitSegmentMetadata(rawTableName,
committingSegmentDescriptor);
Assert.assertFalse(status);
Assert.assertEquals(segmentManager._nCallsToUpdateHelix, 0); //
Idealstate not updated
@@ -950,7 +953,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
Set<String> prevInstances =
idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
segmentManager.newMockSegmentMetadata());
boolean status = segmentManager.commitSegmentMetadata(rawTableName,
committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -973,7 +976,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._records.clear();
prevInstances =
idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.newMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName,
committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -991,7 +995,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._paths.clear();
segmentManager._records.clear();
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ // We do not expect the segment metadata to be used. Thus reuse the
current metadata.
+ committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.getMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName,
committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertFalse(status);
@@ -1006,7 +1012,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._paths.clear();
segmentManager._records.clear();
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ // We do not expect the segment metadata to be used. Thus reuse the
current metadata.
+ committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.getMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName,
committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertFalse(status);
@@ -1021,7 +1029,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
committingSegmentMetadata = new LLCRealtimeSegmentZKMetadata(newZnRec);
prevInstances =
idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+ committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager.newMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName,
committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -1091,7 +1100,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(reqParams);
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
+ segmentManager1.newMockSegmentMetadata());
boolean status = segmentManager1.commitSegmentMetadata(rawTableName,
committingSegmentDescriptor);
Assert.assertTrue(status); // Committing segment metadata succeeded.
@@ -1365,6 +1375,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
_tableConfigStore = new TableConfigStore();
}
+ private SegmentMetadataImpl newMockSegmentMetadata() {
+ segmentMetadata = mock(SegmentMetadataImpl.class);
+
when(segmentMetadata.getCrc()).thenReturn(FakePinotLLCRealtimeSegmentManager.CRC);
+
when(segmentMetadata.getTimeInterval()).thenReturn(FakePinotLLCRealtimeSegmentManager.INTERVAL);
+
when(segmentMetadata.getVersion()).thenReturn(FakePinotLLCRealtimeSegmentManager.SEGMENT_VERSION);
+
when(segmentMetadata.getTotalRawDocs()).thenReturn(FakePinotLLCRealtimeSegmentManager.NUM_DOCS);
+ return segmentMetadata;
+ }
+
+ private SegmentMetadataImpl getMockSegmentMetadata() {
+ return segmentMetadata;
+ }
+
void addTableToStore(String tableName, TableConfig tableConfig, int
nStreamPartitions) {
_tableConfigStore.addTable(tableName, tableConfig, nStreamPartitions);
}
@@ -1457,11 +1480,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
partitionAssignment, committingSegmentDescriptor, isNewTableSetup);
}
- @Override
- protected boolean updateOldSegmentMetadataZNRecord(String
realtimeTableName,
- LLCSegmentName committingLLCSegmentName, long nextOffset) {
- return super.updateOldSegmentMetadataZNRecord(realtimeTableName,
committingLLCSegmentName, nextOffset);
- }
@Override
public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String
realtimeTableName, String segmentName,
@@ -1482,15 +1500,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
return metadata;
}
- @Override
- protected SegmentMetadataImpl extractSegmentMetadata(final String
rawTableName, final String segmentNameStr) {
- segmentMetadata = mock(SegmentMetadataImpl.class);
- when(segmentMetadata.getCrc()).thenReturn(CRC);
- when(segmentMetadata.getTimeInterval()).thenReturn(INTERVAL);
- when(segmentMetadata.getVersion()).thenReturn(SEGMENT_VERSION);
- when(segmentMetadata.getTotalRawDocs()).thenReturn(NUM_DOCS);
- return segmentMetadata;
- }
public void verifyMetadataInteractions() {
verify(segmentMetadata, times(1)).getCrc();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 6ebd34f..3b8503e 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -151,7 +151,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -220,7 +221,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -330,7 +332,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("doNotCommitMe");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
// Now the FSM should have aborted
@@ -363,7 +366,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
ControllerResponseStatus.COMMIT_SUCCESS);
// And the FSM should be removed.
Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -415,7 +419,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -464,7 +469,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
// Now the FSM should have disappeared from the map
@@ -523,7 +529,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -595,7 +602,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// We ask S2 to keep the segment
params = new
Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
@@ -661,7 +669,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new
Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true,
isSplitCommit);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true,
isSplitCommit,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -855,7 +864,8 @@ public class SegmentCompletionTest {
long commitTimeMs = (segmentCompletionMgr._secconds - startTime) * 1000;
Assert.assertEquals(commitTimeMap.get(tableName).longValue(),
commitTimeMs);
segmentCompletionMgr._secconds += 55;
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false);
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// now FSM should be out of the map.
Assert.assertFalse((fsmMap.containsKey(segmentNameStr)));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]