This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b9171ddb25 Cleanup long offset in completion protocol (#14557)
b9171ddb25 is described below

commit b9171ddb25f8a964e9da7452867a3bfb922fb106
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Nov 29 20:01:07 2024 -0800

    Cleanup long offset in completion protocol (#14557)
---
 .../protocols/SegmentCompletionProtocol.java       |  30 +--
 .../protocols/SegmentCompletionProtocolTest.java   |   6 +-
 .../resources/LLCSegmentCompletionHandlers.java    | 214 ++++++++++-----------
 3 files changed, 111 insertions(+), 139 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 1cad2d09f5..86d880a935 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -126,7 +126,6 @@ public class SegmentCompletionProtocol {
 
   public static final String PARAM_SEGMENT_LOCATION = "location";
   public static final String PARAM_SEGMENT_NAME = "name";
-  public static final String PARAM_OFFSET = "offset";
   public static final String PARAM_STREAM_PARTITION_MSG_OFFSET = 
"streamPartitionMsgOffset";
   public static final String PARAM_INSTANCE_ID = "instance";
   public static final String PARAM_MEMORY_USED_BYTES = "memoryUsedBytes";
@@ -197,7 +196,6 @@ public class SegmentCompletionProtocol {
 
       Map<String, String> params = new HashMap<>();
       params.put(PARAM_SEGMENT_NAME, _params.getSegmentName());
-      params.put(PARAM_OFFSET, String.valueOf(_params.getOffset()));
       params.put(PARAM_INSTANCE_ID, _params.getInstanceId());
       if (_params.getReason() != null) {
         params.put(PARAM_REASON, _params.getReason());
@@ -230,7 +228,6 @@ public class SegmentCompletionProtocol {
     }
 
     public static class Params {
-      private long _offset;
       private String _segmentName;
       private String _instanceId;
       private String _reason;
@@ -244,7 +241,6 @@ public class SegmentCompletionProtocol {
       private String _streamPartitionMsgOffset;
 
       public Params() {
-        _offset = -1L;
         _segmentName = "UNKNOWN_SEGMENT";
         _instanceId = "UNKNOWN_INSTANCE";
         _numRows = NUM_ROWS_DEFAULT;
@@ -259,7 +255,6 @@ public class SegmentCompletionProtocol {
       }
 
       public Params(Params params) {
-        _offset = params.getOffset();
         _segmentName = params.getSegmentName();
         _instanceId = params.getInstanceId();
         _numRows = params.getNumRows();
@@ -273,12 +268,6 @@ public class SegmentCompletionProtocol {
         _reason = params.getReason();
       }
 
-      @Deprecated
-      public Params withOffset(long offset) {
-        _offset = offset;
-        return this;
-      }
-
       public Params withSegmentName(String segmentName) {
         _segmentName = segmentName;
         return this;
@@ -338,11 +327,6 @@ public class SegmentCompletionProtocol {
         return _segmentName;
       }
 
-      @Deprecated
-      private long getOffset() {
-        return _offset;
-      }
-
       public String getReason() {
         return _reason;
       }
@@ -384,10 +368,16 @@ public class SegmentCompletionProtocol {
       }
 
       public String toString() {
-        return "Offset: " + _offset + ",Segment name: " + _segmentName + 
",Instance Id: " + _instanceId + ",Reason: "
-            + _reason + ",NumRows: " + _numRows + ",BuildTimeMillis: " + 
_buildTimeMillis + ",WaitTimeMillis: "
-            + _waitTimeMillis + ",ExtraTimeSec: " + _extraTimeSec + 
",SegmentLocation: " + _segmentLocation
-            + ",MemoryUsedBytes: " + _memoryUsedBytes + ",SegmentSizeBytes: " 
+ _segmentSizeBytes
+        return "Segment name: " + _segmentName
+            + ",Instance Id: " + _instanceId
+            + ",Reason: " + _reason
+            + ",NumRows: " + _numRows
+            + ",BuildTimeMillis: " + _buildTimeMillis
+            + ",WaitTimeMillis: " + _waitTimeMillis
+            + ",ExtraTimeSec: " + _extraTimeSec
+            + ",SegmentLocation: " + _segmentLocation
+            + ",MemoryUsedBytes: " + _memoryUsedBytes
+            + ",SegmentSizeBytes: " + _segmentSizeBytes
             + ",StreamPartitionMsgOffset: " + _streamPartitionMsgOffset;
       }
     }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
index 71d767f9fb..80fa3084d6 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
@@ -45,7 +45,6 @@ public class SegmentCompletionProtocolTest {
     Map<String, String> paramsMap =
         Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e -> 
e.split("=")[0], e -> e.split("=")[1]));
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
 "UNKNOWN_SEGMENT");
-    Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET), 
"-1");
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID), 
"UNKNOWN_INSTANCE");
     Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON));
     
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS));
@@ -68,7 +67,6 @@ public class SegmentCompletionProtocolTest {
     paramsMap =
         Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e -> 
e.split("=")[0], e -> e.split("=")[1]));
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
 "foo__0__0__12345Z");
-    Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET), 
"-1");
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID), 
"Server_localhost_8099");
     Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON));
     
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS));
@@ -90,7 +88,6 @@ public class SegmentCompletionProtocolTest {
     paramsMap =
         Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e -> 
e.split("=")[0], e -> e.split("=")[1]));
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
 "foo__0__0__12345Z");
-    Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET), 
"-1");
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID), 
"Server_localhost_8099");
     Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON), 
"ROW_LIMIT");
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS),
 "1000");
@@ -114,14 +111,13 @@ public class SegmentCompletionProtocolTest {
     String url = segmentCommitStartRequest.getUrl("localhost:8080", "http");
     Assert.assertEquals(url,
     // CHECKSTYLE:OFF
-        
"http://localhost:8080/segmentCommitStart?extraTimeSec=3000&segmentSizeBytes=5000&reason=%7B%22type%22%3A%22ROW_LIMIT%22%2C%20%22value%22%3A1000%7D&buildTimeMillis=1000&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249615238429973311938200772279310862572716999467690098706%22%7D&instance=Server_localhost_8099&waitTimeMillis=2000&offset=-1&name=foo%25%25__0__0__12345Z&location=s3%3A%2F%2Fmy.bucket%2Fsegment&rowCount=6000&memoryUsedBytes=4000";);
+        
"http://localhost:8080/segmentCommitStart?extraTimeSec=3000&segmentSizeBytes=5000&reason=%7B%22type%22%3A%22ROW_LIMIT%22%2C%20%22value%22%3A1000%7D&buildTimeMillis=1000&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249615238429973311938200772279310862572716999467690098706%22%7D&instance=Server_localhost_8099&waitTimeMillis=2000&name=foo%25%25__0__0__12345Z&location=s3%3A%2F%2Fmy.bucket%2Fsegment&rowCount=6000&memoryUsedBytes=4000";);
     // CHECKSTYLE:ON
 
     paramsMap = Arrays.stream(url.split("\\?")[1].split("&"))
         .collect(Collectors.toMap(e -> e.split("=")[0], e -> e.split("=")[1]));
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
         URIUtils.encode("foo%%__0__0__12345Z"));
-    Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET), 
"-1");
     
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
         URIUtils.encode("Server_localhost_8099"));
     Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index e1541460cc..5e5adb0d80 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -84,13 +84,11 @@ public class LLCSegmentCompletionHandlers {
   @Produces(MediaType.APPLICATION_JSON)
   public String 
extendBuildTime(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
       @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_EXTRA_TIME_SEC) int 
extraTimeSec) {
-
-    if (instanceId == null || segmentName == null || (offset == -1 && 
streamPartitionMsgOffset == null)) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}", offset,
-          segmentName, instanceId, streamPartitionMsgOffset);
+    if (instanceId == null || segmentName == null || streamPartitionMsgOffset 
== null) {
+      LOGGER.error("Invalid call: segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}", segmentName, instanceId,
+          streamPartitionMsgOffset);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
     if (extraTimeSec <= 0) {
@@ -99,29 +97,16 @@ public class LLCSegmentCompletionHandlers {
       extraTimeSec = 
SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds();
     }
 
-    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withExtraTimeSec(extraTimeSec);
-    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-
-    LOGGER.info("Processing extendBuildTime:{}", requestParams.toString());
-
-    SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.extendBuildTime(requestParams);
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params()
+        .withInstanceId(instanceId)
+        .withSegmentName(segmentName)
+        .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+        .withExtraTimeSec(extraTimeSec);
+    LOGGER.info("Processing extendBuildTime: {}", requestParams);
 
-    final String responseStr = response.toJsonString();
-    LOGGER.info("Response to extendBuildTime:{}", responseStr);
-    return responseStr;
-  }
-
-  private void 
extractOffsetFromParams(SegmentCompletionProtocol.Request.Params requestParams,
-      String streamPartitionMsgOffset, long offset) {
-    // If the sender sent us a stream partition message offset, use it. If 
not, the sender is still old
-    // version, so pick up the old offset from it.
-    // TODO Issue 5359 Remove this backup use of offset when server and 
controller are upgraded.
-    if (streamPartitionMsgOffset != null) {
-      requestParams.withStreamPartitionMsgOffset(streamPartitionMsgOffset);
-    } else {
-      requestParams.withStreamPartitionMsgOffset(Long.toString(offset));
-    }
+    String response = 
_segmentCompletionManager.extendBuildTime(requestParams).toJsonString();
+    LOGGER.info("Response to extendBuildTime: {}", response);
+    return response;
   }
 
   @GET
@@ -130,27 +115,28 @@ public class LLCSegmentCompletionHandlers {
   @Produces(MediaType.APPLICATION_JSON)
   public String 
segmentConsumed(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
       @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows) {
-
-    if (instanceId == null || segmentName == null || (offset == -1 && 
streamPartitionMsgOffset == null)) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}", offset,
-          segmentName, instanceId, streamPartitionMsgOffset);
+    if (instanceId == null || segmentName == null || streamPartitionMsgOffset 
== null) {
+      LOGGER.error("Invalid call: segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}", segmentName, instanceId,
+          streamPartitionMsgOffset);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
-    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason)
-        .withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows);
-    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-    LOGGER.info("Processing segmentConsumed:{}", requestParams.toString());
-
-    SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.segmentConsumed(requestParams);
-    final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentConsumed for segment:{} is :{}", 
segmentName, responseStr);
-    return responseStr;
+
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params()
+        .withInstanceId(instanceId)
+        .withSegmentName(segmentName)
+        .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+        .withReason(stopReason)
+        .withMemoryUsedBytes(memoryUsedBytes)
+        .withNumRows(numRows);
+    LOGGER.info("Processing segmentConsumed: {}", requestParams);
+
+    String response = 
_segmentCompletionManager.segmentConsumed(requestParams).toJsonString();
+    LOGGER.info("Response to segmentConsumed for segment: {} is: {}", 
segmentName, response);
+    return response;
   }
 
   @GET
@@ -159,24 +145,24 @@ public class LLCSegmentCompletionHandlers {
   @Produces(MediaType.APPLICATION_JSON)
   public String 
segmentStoppedConsuming(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
       @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason) {
-
-    if (instanceId == null || segmentName == null || (offset == -1 && 
streamPartitionMsgOffset == null)) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}", offset,
-          segmentName, instanceId, streamPartitionMsgOffset);
+    if (instanceId == null || segmentName == null || streamPartitionMsgOffset 
== null) {
+      LOGGER.error("Invalid call: segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}", segmentName, instanceId,
+          streamPartitionMsgOffset);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
-    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason);
-    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-    LOGGER.info("Processing segmentStoppedConsuming:{}", 
requestParams.toString());
-
-    SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.segmentStoppedConsuming(requestParams);
-    final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentStoppedConsuming for segment:{} is:{}", 
segmentName, responseStr);
-    return responseStr;
+
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params()
+        .withInstanceId(instanceId)
+        .withSegmentName(segmentName)
+        .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+        .withReason(stopReason);
+    LOGGER.info("Processing segmentStoppedConsuming: {}", requestParams);
+
+    String response = 
_segmentCompletionManager.segmentStoppedConsuming(requestParams).toJsonString();
+    LOGGER.info("Response to segmentStoppedConsuming for segment: {} is: {}", 
segmentName, response);
+    return response;
   }
 
   @GET
@@ -185,33 +171,32 @@ public class LLCSegmentCompletionHandlers {
   @Produces(MediaType.APPLICATION_JSON)
   public String 
segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) 
String instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
       @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes) {
-
-    if (instanceId == null || segmentName == null || (offset == -1 && 
streamPartitionMsgOffset == null)) {
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}", offset,
-          segmentName, instanceId, streamPartitionMsgOffset);
-      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}", 
offset, segmentName, instanceId);
+    if (instanceId == null || segmentName == null || streamPartitionMsgOffset 
== null) {
+      LOGGER.error("Invalid call: segmentName={}, instanceId={}, 
streamPartitionMsgOffset={}", segmentName, instanceId,
+          streamPartitionMsgOffset);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
 
-    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withMemoryUsedBytes(memoryUsedBytes)
-        
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params()
+        .withInstanceId(instanceId)
+        .withSegmentName(segmentName)
+        .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+        .withMemoryUsedBytes(memoryUsedBytes)
+        .withBuildTimeMillis(buildTimeMillis)
+        .withWaitTimeMillis(waitTimeMillis)
+        .withNumRows(numRows)
         .withSegmentSizeBytes(segmentSizeBytes);
-    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-
-    LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
+    LOGGER.info("Processing segmentCommitStart: {}", requestParams);
 
-    SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.segmentCommitStart(requestParams);
-    final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentCommitStart for segment:{} is:{}", 
segmentName, responseStr);
-    return responseStr;
+    String response = 
_segmentCompletionManager.segmentCommitStart(requestParams).toJsonString();
+    LOGGER.info("Response to segmentCommitStart for segment: {} is: {}", 
segmentName, response);
+    return response;
   }
 
   // Remove after releasing 1.1 (server always use split commit)
@@ -224,23 +209,24 @@ public class LLCSegmentCompletionHandlers {
   @Produces(MediaType.APPLICATION_JSON)
   public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
       @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, 
FormDataMultiPart multiPart) {
-
-    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withSegmentSizeBytes(segmentSizeBytes)
-        
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params()
+        .withInstanceId(instanceId)
+        .withSegmentName(segmentName)
+        .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+        .withSegmentSizeBytes(segmentSizeBytes)
+        .withBuildTimeMillis(buildTimeMillis)
+        .withWaitTimeMillis(waitTimeMillis)
+        .withNumRows(numRows)
         .withMemoryUsedBytes(memoryUsedBytes);
-    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-    LOGGER.info("Processing segmentCommit:{}", requestParams.toString());
+    LOGGER.info("Processing segmentCommit: {}", requestParams);
 
-    final SegmentCompletionManager segmentCompletionManager = 
_segmentCompletionManager;
-    SegmentCompletionProtocol.Response response = 
segmentCompletionManager.segmentCommitStart(requestParams);
+    SegmentCompletionProtocol.Response response = 
_segmentCompletionManager.segmentCommitStart(requestParams);
 
     CommittingSegmentDescriptor committingSegmentDescriptor =
         
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(requestParams);
@@ -253,8 +239,8 @@ public class LLCSegmentCompletionHandlers {
         SegmentMetadataImpl segmentMetadata = 
extractMetadataFromLocalSegmentFile(localTempFile);
         // Store the segment file to Pinot FS.
         String rawTableName = new LLCSegmentName(segmentName).getTableName();
-        URI segmentFileURI = URIUtils
-            
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), 
rawTableName,
+        URI segmentFileURI =
+            
URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
 rawTableName,
                 URIUtils.encode(segmentName));
         PinotFS pinotFS = PinotFSFactory.create(segmentFileURI.getScheme());
         // Multiple threads can reach this point at the same time, if the 
following scenario happens
@@ -293,7 +279,7 @@ public class LLCSegmentCompletionHandlers {
       }
     }
 
-    response = segmentCompletionManager.segmentCommitEnd(requestParams, 
success, false, committingSegmentDescriptor);
+    response = _segmentCompletionManager.segmentCommitEnd(requestParams, 
success, false, committingSegmentDescriptor);
     LOGGER.info("Response to segmentCommit: instance={}, segment={}, 
status={}, streamMsgOffset={}",
         requestParams.getInstanceId(), requestParams.getSegmentName(), 
response.getStatus(),
         response.getStreamPartitionMsgOffset());
@@ -314,21 +300,21 @@ public class LLCSegmentCompletionHandlers {
   @TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS)
   public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
       @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       FormDataMultiPart multiPart) {
-    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    requestParams.withInstanceId(instanceId).withSegmentName(segmentName);
-    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-    LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params()
+        .withInstanceId(instanceId)
+        .withSegmentName(segmentName)
+        .withStreamPartitionMsgOffset(streamPartitionMsgOffset);
+    LOGGER.info("Processing segmentUpload: {}", requestParams);
 
     // Get the segment from the form input and put it into the data directory 
(could be remote)
     File localTempFile = null;
     try {
       localTempFile = extractSegmentFromFormToLocalTempFile(multiPart, 
segmentName);
       String rawTableName = new LLCSegmentName(segmentName).getTableName();
-      URI segmentFileURI = URIUtils
-          
.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(), 
rawTableName,
+      URI segmentFileURI =
+          
URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
 rawTableName,
               
URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName)));
       
PinotFSFactory.create(segmentFileURI.getScheme()).copyFromLocalFile(localTempFile,
 segmentFileURI);
       SegmentCompletionProtocol.Response.Params responseParams = new 
SegmentCompletionProtocol.Response.Params()
@@ -337,7 +323,7 @@ public class LLCSegmentCompletionHandlers {
           
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
 
       String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
-      LOGGER.info("Response to segmentUpload for segment:{} is:{}", 
segmentName, response);
+      LOGGER.info("Response to segmentUpload for segment: {} is: {}", 
segmentName, response);
       return response;
     } catch (Exception e) {
       LOGGER.error("Caught exception while uploading segment: {} from 
instance: {}", segmentName, instanceId, e);
@@ -356,30 +342,33 @@ public class LLCSegmentCompletionHandlers {
   public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
       @QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) 
String streamPartitionMsgOffset,
       @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
       @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
-      @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
-      FormDataMultiPart metadataFiles) {
-    if (instanceId == null || segmentName == null || segmentLocation == null 
|| metadataFiles == null || (offset == -1
-        && streamPartitionMsgOffset == null)) {
-      LOGGER.error(
-          "Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}, streamPartitionMsgOffset={}",
-          offset, segmentName, instanceId, segmentLocation, 
streamPartitionMsgOffset);
+      @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason, 
FormDataMultiPart metadataFiles) {
+    if (instanceId == null || segmentName == null || segmentLocation == null 
|| metadataFiles == null
+        || streamPartitionMsgOffset == null) {
+      LOGGER.error("Invalid call: segmentName={}, instanceId={}, 
segmentLocation={}, streamPartitionMsgOffset={}",
+          segmentName, instanceId, segmentLocation, streamPartitionMsgOffset);
       // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
 
-    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
-    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withSegmentLocation(segmentLocation)
-        
.withSegmentSizeBytes(segmentSizeBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis)
-        
.withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes).withReason(stopReason);
-    extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
-    LOGGER.info("Processing segmentCommitEndWithMetadata:{}", 
requestParams.toString());
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params()
+        .withInstanceId(instanceId)
+        .withSegmentName(segmentName)
+        .withSegmentLocation(segmentLocation)
+        .withStreamPartitionMsgOffset(streamPartitionMsgOffset)
+        .withSegmentSizeBytes(segmentSizeBytes)
+        .withBuildTimeMillis(buildTimeMillis)
+        .withWaitTimeMillis(waitTimeMillis)
+        .withNumRows(numRows)
+        .withMemoryUsedBytes(memoryUsedBytes)
+        .withReason(stopReason);
+    LOGGER.info("Processing segmentCommitEndWithMetadata: {}", requestParams);
 
     SegmentMetadataImpl segmentMetadata;
     try {
@@ -390,14 +379,11 @@ public class LLCSegmentCompletionHandlers {
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
 
-    final boolean isSuccess = true;
-    final boolean isSplitCommit = true;
-    SegmentCompletionProtocol.Response response = _segmentCompletionManager
-        .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
-            
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata));
-    final String responseStr = response.toJsonString();
-    LOGGER.info("Response to segmentCommitEndWithMetadata for segment:{} 
is:{}", segmentName, responseStr);
-    return responseStr;
+    String response = 
_segmentCompletionManager.segmentCommitEnd(requestParams, true, true,
+            
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata))
+        .toJsonString();
+    LOGGER.info("Response to segmentCommitEndWithMetadata for segment: {} is: 
{}", segmentName, response);
+    return response;
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to