This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b9171ddb25 Cleanup long offset in completion protocol (#14557)
b9171ddb25 is described below
commit b9171ddb25f8a964e9da7452867a3bfb922fb106
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Nov 29 20:01:07 2024 -0800
Cleanup long offset in completion protocol (#14557)
---
.../protocols/SegmentCompletionProtocol.java | 30 +--
.../protocols/SegmentCompletionProtocolTest.java | 6 +-
.../resources/LLCSegmentCompletionHandlers.java | 214 ++++++++++-----------
3 files changed, 111 insertions(+), 139 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 1cad2d09f5..86d880a935 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -126,7 +126,6 @@ public class SegmentCompletionProtocol {
public static final String PARAM_SEGMENT_LOCATION = "location";
public static final String PARAM_SEGMENT_NAME = "name";
- public static final String PARAM_OFFSET = "offset";
public static final String PARAM_STREAM_PARTITION_MSG_OFFSET =
"streamPartitionMsgOffset";
public static final String PARAM_INSTANCE_ID = "instance";
public static final String PARAM_MEMORY_USED_BYTES = "memoryUsedBytes";
@@ -197,7 +196,6 @@ public class SegmentCompletionProtocol {
Map<String, String> params = new HashMap<>();
params.put(PARAM_SEGMENT_NAME, _params.getSegmentName());
- params.put(PARAM_OFFSET, String.valueOf(_params.getOffset()));
params.put(PARAM_INSTANCE_ID, _params.getInstanceId());
if (_params.getReason() != null) {
params.put(PARAM_REASON, _params.getReason());
@@ -230,7 +228,6 @@ public class SegmentCompletionProtocol {
}
public static class Params {
- private long _offset;
private String _segmentName;
private String _instanceId;
private String _reason;
@@ -244,7 +241,6 @@ public class SegmentCompletionProtocol {
private String _streamPartitionMsgOffset;
public Params() {
- _offset = -1L;
_segmentName = "UNKNOWN_SEGMENT";
_instanceId = "UNKNOWN_INSTANCE";
_numRows = NUM_ROWS_DEFAULT;
@@ -259,7 +255,6 @@ public class SegmentCompletionProtocol {
}
public Params(Params params) {
- _offset = params.getOffset();
_segmentName = params.getSegmentName();
_instanceId = params.getInstanceId();
_numRows = params.getNumRows();
@@ -273,12 +268,6 @@ public class SegmentCompletionProtocol {
_reason = params.getReason();
}
- @Deprecated
- public Params withOffset(long offset) {
- _offset = offset;
- return this;
- }
-
public Params withSegmentName(String segmentName) {
_segmentName = segmentName;
return this;
@@ -338,11 +327,6 @@ public class SegmentCompletionProtocol {
return _segmentName;
}
- @Deprecated
- private long getOffset() {
- return _offset;
- }
-
public String getReason() {
return _reason;
}
@@ -384,10 +368,16 @@ public class SegmentCompletionProtocol {
}
public String toString() {
- return "Offset: " + _offset + ",Segment name: " + _segmentName +
",Instance Id: " + _instanceId + ",Reason: "
- + _reason + ",NumRows: " + _numRows + ",BuildTimeMillis: " +
_buildTimeMillis + ",WaitTimeMillis: "
- + _waitTimeMillis + ",ExtraTimeSec: " + _extraTimeSec +
",SegmentLocation: " + _segmentLocation
- + ",MemoryUsedBytes: " + _memoryUsedBytes + ",SegmentSizeBytes: "
+ _segmentSizeBytes
+ return "Segment name: " + _segmentName
+ + ",Instance Id: " + _instanceId
+ + ",Reason: " + _reason
+ + ",NumRows: " + _numRows
+ + ",BuildTimeMillis: " + _buildTimeMillis
+ + ",WaitTimeMillis: " + _waitTimeMillis
+ + ",ExtraTimeSec: " + _extraTimeSec
+ + ",SegmentLocation: " + _segmentLocation
+ + ",MemoryUsedBytes: " + _memoryUsedBytes
+ + ",SegmentSizeBytes: " + _segmentSizeBytes
+ ",StreamPartitionMsgOffset: " + _streamPartitionMsgOffset;
}
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
index 71d767f9fb..80fa3084d6 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
@@ -45,7 +45,6 @@ public class SegmentCompletionProtocolTest {
Map<String, String> paramsMap =
Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e ->
e.split("=")[0], e -> e.split("=")[1]));
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
"UNKNOWN_SEGMENT");
- Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET),
"-1");
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
"UNKNOWN_INSTANCE");
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON));
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS));
@@ -68,7 +67,6 @@ public class SegmentCompletionProtocolTest {
paramsMap =
Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e ->
e.split("=")[0], e -> e.split("=")[1]));
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
"foo__0__0__12345Z");
- Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET),
"-1");
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
"Server_localhost_8099");
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON));
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS));
@@ -90,7 +88,6 @@ public class SegmentCompletionProtocolTest {
paramsMap =
Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e ->
e.split("=")[0], e -> e.split("=")[1]));
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
"foo__0__0__12345Z");
- Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET),
"-1");
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
"Server_localhost_8099");
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON),
"ROW_LIMIT");
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS),
"1000");
@@ -114,14 +111,13 @@ public class SegmentCompletionProtocolTest {
String url = segmentCommitStartRequest.getUrl("localhost:8080", "http");
Assert.assertEquals(url,
// CHECKSTYLE:OFF
-
"http://localhost:8080/segmentCommitStart?extraTimeSec=3000&segmentSizeBytes=5000&reason=%7B%22type%22%3A%22ROW_LIMIT%22%2C%20%22value%22%3A1000%7D&buildTimeMillis=1000&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249615238429973311938200772279310862572716999467690098706%22%7D&instance=Server_localhost_8099&waitTimeMillis=2000&offset=-1&name=foo%25%25__0__0__12345Z&location=s3%3A%2F%2Fmy.bucket%2Fsegment&rowCount=6000&memoryUsedBytes=4000");
+
"http://localhost:8080/segmentCommitStart?extraTimeSec=3000&segmentSizeBytes=5000&reason=%7B%22type%22%3A%22ROW_LIMIT%22%2C%20%22value%22%3A1000%7D&buildTimeMillis=1000&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249615238429973311938200772279310862572716999467690098706%22%7D&instance=Server_localhost_8099&waitTimeMillis=2000&name=foo%25%25__0__0__12345Z&location=s3%3A%2F%2Fmy.bucket%2Fsegment&rowCount=6000&memoryUsedBytes=4000");
// CHECKSTYLE:ON
paramsMap = Arrays.stream(url.split("\\?")[1].split("&"))
.collect(Collectors.toMap(e -> e.split("=")[0], e -> e.split("=")[1]));
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
URIUtils.encode("foo%%__0__0__12345Z"));
- Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET),
"-1");
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
URIUtils.encode("Server_localhost_8099"));
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index e1541460cc..5e5adb0d80 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -84,13 +84,11 @@ public class LLCSegmentCompletionHandlers {
@Produces(MediaType.APPLICATION_JSON)
public String
extendBuildTime(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String
instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
- @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
@QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET)
String streamPartitionMsgOffset,
@QueryParam(SegmentCompletionProtocol.PARAM_EXTRA_TIME_SEC) int
extraTimeSec) {
-
- if (instanceId == null || segmentName == null || (offset == -1 &&
streamPartitionMsgOffset == null)) {
- LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={},
streamPartitionMsgOffset={}", offset,
- segmentName, instanceId, streamPartitionMsgOffset);
+ if (instanceId == null || segmentName == null || streamPartitionMsgOffset
== null) {
+ LOGGER.error("Invalid call: segmentName={}, instanceId={},
streamPartitionMsgOffset={}", segmentName, instanceId,
+ streamPartitionMsgOffset);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
if (extraTimeSec <= 0) {
@@ -99,29 +97,16 @@ public class LLCSegmentCompletionHandlers {
extraTimeSec =
SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds();
}
- SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
-
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withExtraTimeSec(extraTimeSec);
- extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-
- LOGGER.info("Processing extendBuildTime:{}", requestParams.toString());
-
- SegmentCompletionProtocol.Response response =
_segmentCompletionManager.extendBuildTime(requestParams);
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params()
+ .withInstanceId(instanceId)
+ .withSegmentName(segmentName)
+ .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+ .withExtraTimeSec(extraTimeSec);
+ LOGGER.info("Processing extendBuildTime: {}", requestParams);
- final String responseStr = response.toJsonString();
- LOGGER.info("Response to extendBuildTime:{}", responseStr);
- return responseStr;
- }
-
- private void
extractOffsetFromParams(SegmentCompletionProtocol.Request.Params requestParams,
- String streamPartitionMsgOffset, long offset) {
- // If the sender sent us a stream partition message offset, use it. If
not, the sender is still old
- // version, so pick up the old offset from it.
- // TODO Issue 5359 Remove this backup use of offset when server and
controller are upgraded.
- if (streamPartitionMsgOffset != null) {
- requestParams.withStreamPartitionMsgOffset(streamPartitionMsgOffset);
- } else {
- requestParams.withStreamPartitionMsgOffset(Long.toString(offset));
- }
+ String response =
_segmentCompletionManager.extendBuildTime(requestParams).toJsonString();
+ LOGGER.info("Response to extendBuildTime: {}", response);
+ return response;
}
@GET
@@ -130,27 +115,28 @@ public class LLCSegmentCompletionHandlers {
@Produces(MediaType.APPLICATION_JSON)
public String
segmentConsumed(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String
instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
- @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
@QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET)
String streamPartitionMsgOffset,
@QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long
memoryUsedBytes,
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows) {
-
- if (instanceId == null || segmentName == null || (offset == -1 &&
streamPartitionMsgOffset == null)) {
- LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={},
streamPartitionMsgOffset={}", offset,
- segmentName, instanceId, streamPartitionMsgOffset);
+ if (instanceId == null || segmentName == null || streamPartitionMsgOffset
== null) {
+ LOGGER.error("Invalid call: segmentName={}, instanceId={},
streamPartitionMsgOffset={}", segmentName, instanceId,
+ streamPartitionMsgOffset);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
-
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason)
- .withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows);
- extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
- LOGGER.info("Processing segmentConsumed:{}", requestParams.toString());
-
- SegmentCompletionProtocol.Response response =
_segmentCompletionManager.segmentConsumed(requestParams);
- final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentConsumed for segment:{} is :{}",
segmentName, responseStr);
- return responseStr;
+
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params()
+ .withInstanceId(instanceId)
+ .withSegmentName(segmentName)
+ .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+ .withReason(stopReason)
+ .withMemoryUsedBytes(memoryUsedBytes)
+ .withNumRows(numRows);
+ LOGGER.info("Processing segmentConsumed: {}", requestParams);
+
+ String response =
_segmentCompletionManager.segmentConsumed(requestParams).toJsonString();
+ LOGGER.info("Response to segmentConsumed for segment: {} is: {}",
segmentName, response);
+ return response;
}
@GET
@@ -159,24 +145,24 @@ public class LLCSegmentCompletionHandlers {
@Produces(MediaType.APPLICATION_JSON)
public String
segmentStoppedConsuming(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
String instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
- @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
@QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET)
String streamPartitionMsgOffset,
@QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason) {
-
- if (instanceId == null || segmentName == null || (offset == -1 &&
streamPartitionMsgOffset == null)) {
- LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={},
streamPartitionMsgOffset={}", offset,
- segmentName, instanceId, streamPartitionMsgOffset);
+ if (instanceId == null || segmentName == null || streamPartitionMsgOffset
== null) {
+ LOGGER.error("Invalid call: segmentName={}, instanceId={},
streamPartitionMsgOffset={}", segmentName, instanceId,
+ streamPartitionMsgOffset);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
-
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason);
- extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
- LOGGER.info("Processing segmentStoppedConsuming:{}",
requestParams.toString());
-
- SegmentCompletionProtocol.Response response =
_segmentCompletionManager.segmentStoppedConsuming(requestParams);
- final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentStoppedConsuming for segment:{} is:{}",
segmentName, responseStr);
- return responseStr;
+
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params()
+ .withInstanceId(instanceId)
+ .withSegmentName(segmentName)
+ .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+ .withReason(stopReason);
+ LOGGER.info("Processing segmentStoppedConsuming: {}", requestParams);
+
+ String response =
_segmentCompletionManager.segmentStoppedConsuming(requestParams).toJsonString();
+ LOGGER.info("Response to segmentStoppedConsuming for segment: {} is: {}",
segmentName, response);
+ return response;
}
@GET
@@ -185,33 +171,32 @@ public class LLCSegmentCompletionHandlers {
@Produces(MediaType.APPLICATION_JSON)
public String
segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
String instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
- @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
@QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET)
String streamPartitionMsgOffset,
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long
memoryUsedBytes,
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long
buildTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long
waitTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long
segmentSizeBytes) {
-
- if (instanceId == null || segmentName == null || (offset == -1 &&
streamPartitionMsgOffset == null)) {
- LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={},
streamPartitionMsgOffset={}", offset,
- segmentName, instanceId, streamPartitionMsgOffset);
- LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}",
offset, segmentName, instanceId);
+ if (instanceId == null || segmentName == null || streamPartitionMsgOffset
== null) {
+ LOGGER.error("Invalid call: segmentName={}, instanceId={},
streamPartitionMsgOffset={}", segmentName, instanceId,
+ streamPartitionMsgOffset);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
-
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withMemoryUsedBytes(memoryUsedBytes)
-
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params()
+ .withInstanceId(instanceId)
+ .withSegmentName(segmentName)
+ .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+ .withMemoryUsedBytes(memoryUsedBytes)
+ .withBuildTimeMillis(buildTimeMillis)
+ .withWaitTimeMillis(waitTimeMillis)
+ .withNumRows(numRows)
.withSegmentSizeBytes(segmentSizeBytes);
- extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-
- LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
+ LOGGER.info("Processing segmentCommitStart: {}", requestParams);
- SegmentCompletionProtocol.Response response =
_segmentCompletionManager.segmentCommitStart(requestParams);
- final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentCommitStart for segment:{} is:{}",
segmentName, responseStr);
- return responseStr;
+ String response =
_segmentCompletionManager.segmentCommitStart(requestParams).toJsonString();
+ LOGGER.info("Response to segmentCommitStart for segment: {} is: {}",
segmentName, response);
+ return response;
}
// Remove after releasing 1.1 (server always use split commit)
@@ -224,23 +209,24 @@ public class LLCSegmentCompletionHandlers {
@Produces(MediaType.APPLICATION_JSON)
public String
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String
instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
- @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
@QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET)
String streamPartitionMsgOffset,
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long
memoryUsedBytes,
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long
buildTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long
waitTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long
segmentSizeBytes,
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
FormDataMultiPart multiPart) {
-
- SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
-
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withSegmentSizeBytes(segmentSizeBytes)
-
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params()
+ .withInstanceId(instanceId)
+ .withSegmentName(segmentName)
+ .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+ .withSegmentSizeBytes(segmentSizeBytes)
+ .withBuildTimeMillis(buildTimeMillis)
+ .withWaitTimeMillis(waitTimeMillis)
+ .withNumRows(numRows)
.withMemoryUsedBytes(memoryUsedBytes);
- extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
- LOGGER.info("Processing segmentCommit:{}", requestParams.toString());
+ LOGGER.info("Processing segmentCommit: {}", requestParams);
- final SegmentCompletionManager segmentCompletionManager =
_segmentCompletionManager;
- SegmentCompletionProtocol.Response response =
segmentCompletionManager.segmentCommitStart(requestParams);
+ SegmentCompletionProtocol.Response response =
_segmentCompletionManager.segmentCommitStart(requestParams);
CommittingSegmentDescriptor committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(requestParams);
@@ -253,8 +239,8 @@ public class LLCSegmentCompletionHandlers {
SegmentMetadataImpl segmentMetadata =
extractMetadataFromLocalSegmentFile(localTempFile);
// Store the segment file to Pinot FS.
String rawTableName = new LLCSegmentName(segmentName).getTableName();
- URI segmentFileURI = URIUtils
-
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
rawTableName,
+ URI segmentFileURI =
+
URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
rawTableName,
URIUtils.encode(segmentName));
PinotFS pinotFS = PinotFSFactory.create(segmentFileURI.getScheme());
// Multiple threads can reach this point at the same time, if the
following scenario happens
@@ -293,7 +279,7 @@ public class LLCSegmentCompletionHandlers {
}
}
- response = segmentCompletionManager.segmentCommitEnd(requestParams,
success, false, committingSegmentDescriptor);
+ response = _segmentCompletionManager.segmentCommitEnd(requestParams,
success, false, committingSegmentDescriptor);
LOGGER.info("Response to segmentCommit: instance={}, segment={},
status={}, streamMsgOffset={}",
requestParams.getInstanceId(), requestParams.getSegmentName(),
response.getStatus(),
response.getStreamPartitionMsgOffset());
@@ -314,21 +300,21 @@ public class LLCSegmentCompletionHandlers {
@TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
public String
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String
instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
- @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
@QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET)
String streamPartitionMsgOffset,
FormDataMultiPart multiPart) {
- SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
- requestParams.withInstanceId(instanceId).withSegmentName(segmentName);
- extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
- LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params()
+ .withInstanceId(instanceId)
+ .withSegmentName(segmentName)
+ .withStreamPartitionMsgOffset(streamPartitionMsgOffset);
+ LOGGER.info("Processing segmentUpload: {}", requestParams);
// Get the segment from the form input and put it into the data directory
(could be remote)
File localTempFile = null;
try {
localTempFile = extractSegmentFromFormToLocalTempFile(multiPart,
segmentName);
String rawTableName = new LLCSegmentName(segmentName).getTableName();
- URI segmentFileURI = URIUtils
-
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
rawTableName,
+ URI segmentFileURI =
+
URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
rawTableName,
URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName)));
PinotFSFactory.create(segmentFileURI.getScheme()).copyFromLocalFile(localTempFile,
segmentFileURI);
SegmentCompletionProtocol.Response.Params responseParams = new
SegmentCompletionProtocol.Response.Params()
@@ -337,7 +323,7 @@ public class LLCSegmentCompletionHandlers {
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
String response = new
SegmentCompletionProtocol.Response(responseParams).toJsonString();
- LOGGER.info("Response to segmentUpload for segment:{} is:{}",
segmentName, response);
+ LOGGER.info("Response to segmentUpload for segment: {} is: {}",
segmentName, response);
return response;
} catch (Exception e) {
LOGGER.error("Caught exception while uploading segment: {} from
instance: {}", segmentName, instanceId, e);
@@ -356,30 +342,33 @@ public class LLCSegmentCompletionHandlers {
public String
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
String instanceId,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String
segmentLocation,
- @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
@QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET)
String streamPartitionMsgOffset,
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long
memoryUsedBytes,
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long
buildTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long
waitTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long
segmentSizeBytes,
- @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
- FormDataMultiPart metadataFiles) {
- if (instanceId == null || segmentName == null || segmentLocation == null
|| metadataFiles == null || (offset == -1
- && streamPartitionMsgOffset == null)) {
- LOGGER.error(
- "Invalid call: offset={}, segmentName={}, instanceId={},
segmentLocation={}, streamPartitionMsgOffset={}",
- offset, segmentName, instanceId, segmentLocation,
streamPartitionMsgOffset);
+ @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
FormDataMultiPart metadataFiles) {
+ if (instanceId == null || segmentName == null || segmentLocation == null
|| metadataFiles == null
+ || streamPartitionMsgOffset == null) {
+ LOGGER.error("Invalid call: segmentName={}, instanceId={},
segmentLocation={}, streamPartitionMsgOffset={}",
+ segmentName, instanceId, segmentLocation, streamPartitionMsgOffset);
// TODO: memoryUsedInBytes = 0 if not present in params. Add validation
when we start using it
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
-
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withSegmentLocation(segmentLocation)
-
.withSegmentSizeBytes(segmentSizeBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis)
-
.withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes).withReason(stopReason);
- extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
- LOGGER.info("Processing segmentCommitEndWithMetadata:{}",
requestParams.toString());
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params()
+ .withInstanceId(instanceId)
+ .withSegmentName(segmentName)
+ .withSegmentLocation(segmentLocation)
+ .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+ .withSegmentSizeBytes(segmentSizeBytes)
+ .withBuildTimeMillis(buildTimeMillis)
+ .withWaitTimeMillis(waitTimeMillis)
+ .withNumRows(numRows)
+ .withMemoryUsedBytes(memoryUsedBytes)
+ .withReason(stopReason);
+ LOGGER.info("Processing segmentCommitEndWithMetadata: {}", requestParams);
SegmentMetadataImpl segmentMetadata;
try {
@@ -390,14 +379,11 @@ public class LLCSegmentCompletionHandlers {
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- final boolean isSuccess = true;
- final boolean isSplitCommit = true;
- SegmentCompletionProtocol.Response response = _segmentCompletionManager
- .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
-
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
segmentMetadata));
- final String responseStr = response.toJsonString();
- LOGGER.info("Response to segmentCommitEndWithMetadata for segment:{}
is:{}", segmentName, responseStr);
- return responseStr;
+ String response =
_segmentCompletionManager.segmentCommitEnd(requestParams, true, true,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
segmentMetadata))
+ .toJsonString();
+ LOGGER.info("Response to segmentCommitEndWithMetadata for segment: {} is:
{}", segmentName, response);
+ return response;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]