mcvsubbu commented on a change in pull request #4592: Refactor Realtime Segment 
Data Manager to remove repeat logic 
URL: https://github.com/apache/incubator-pinot/pull/4592#discussion_r322506101
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 ##########
 @@ -760,73 +760,28 @@ protected SegmentBuildDescriptor 
buildSegmentInternal(boolean forCommit) {
     }
   }
 
-  private SegmentCompletionProtocol.Response 
doSplitCommit(SegmentCompletionProtocol.Response prevResponse) {
-    final File segmentTarFile = new 
File(_segmentBuildDescriptor.getSegmentTarFilePath());
+  protected boolean commitSegment(SegmentCompletionProtocol.Response response) 
{
+    final String segTarFileName = 
_segmentBuildDescriptor.getSegmentTarFilePath();
+    File segTarFile = new File(segTarFileName);
+    if (!segTarFile.exists()) {
+      throw new RuntimeException("Segment file does not exist:" + 
segTarFileName);
+    }
+    SegmentCompletionProtocol.Response returnedResponse;
+    boolean isSplitCommit = response.isSplitCommit() && 
_indexLoadingConfig.isEnableSplitCommit();
+
     SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
 
     
params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed)
         
.withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
         .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
         .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
-    if (_isOffHeap) {
-      params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
-    }
-    SegmentCompletionProtocol.Response segmentCommitStartResponse = 
_protocolHandler.segmentCommitStart(params);
-    if (!segmentCommitStartResponse.getStatus()
-        
.equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) {
-      segmentLogger.warn("CommitStart failed  with response {}", 
segmentCommitStartResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
 
-    params = new SegmentCompletionProtocol.Request.Params();
-    
params.withOffset(_currentOffset).withSegmentName(_segmentNameStr).withInstanceId(_instanceId);
-    SegmentCompletionProtocol.Response segmentCommitUploadResponse =
-        _protocolHandler.segmentCommitUpload(params, segmentTarFile, 
prevResponse.getControllerVipUrl());
-    if (!segmentCommitUploadResponse.getStatus()
-        
.equals(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS)) {
-      segmentLogger.warn("Segment upload failed  with response {}", 
segmentCommitUploadResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-
-    params = new SegmentCompletionProtocol.Request.Params();
-    
params.withInstanceId(_instanceId).withOffset(_currentOffset).withSegmentName(_segmentNameStr)
-        
.withSegmentLocation(segmentCommitUploadResponse.getSegmentLocation()).withNumRows(_numRowsConsumed)
-        .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
-        .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
-        .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
-    SegmentCompletionProtocol.Response commitEndResponse;
-    if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) {
-      commitEndResponse = 
_protocolHandler.segmentCommitEndWithMetadata(params, 
_segmentBuildDescriptor.getMetadataFiles());
-    } else {
-      commitEndResponse = _protocolHandler.segmentCommitEnd(params);
-    }
 
-    if 
(!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS))
 {
-      segmentLogger.warn("CommitEnd failed  with response {}", 
commitEndResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-    return commitEndResponse;
-  }
-
-  protected boolean commitSegment(SegmentCompletionProtocol.Response response) 
{
-    final String segTarFileName = 
_segmentBuildDescriptor.getSegmentTarFilePath();
-    File segTarFile = new File(segTarFileName);
-    if (!segTarFile.exists()) {
-      throw new RuntimeException("Segment file does not exist:" + 
segTarFileName);
-    }
-    SegmentCompletionProtocol.Response returnedResponse;
-    if (response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit()) 
{
-      // Send segmentStart, segmentUpload, & segmentCommitEnd to the controller
-      // if that succeeds, swap in-memory segment with the one built.
-      returnedResponse = doSplitCommit(response);
-    } else {
-      // Send segmentCommit() to the controller
-      // if that succeeds, swap in-memory segment with the one built.
-      returnedResponse = postSegmentCommitMsg();
-    }
+    SegmentCommitter segmentCommitter = new SegmentCommitter(isSplitCommit, 
_segmentNameStr, _protocolHandler, _indexLoadingConfig, response, params);
 
 Review comment:
   segmentNameStr should not be needed. Already there in `params`
   Remove `_indexLoadingConfig`. Instead, extract  value of 
`isSplitCommitEndWithMetadata` and pass it to the constructor.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to