mcvsubbu commented on a change in pull request #4713: Refactoring realtime 
segment committer
URL: https://github.com/apache/incubator-pinot/pull/4713#discussion_r348759987
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 ##########
 @@ -761,74 +763,15 @@ protected SegmentBuildDescriptor 
buildSegmentInternal(boolean forCommit) {
     }
   }
 
-  private SegmentCompletionProtocol.Response 
doSplitCommit(SegmentCompletionProtocol.Response prevResponse) {
-    final File segmentTarFile = new 
File(_segmentBuildDescriptor.getSegmentTarFilePath());
-    SegmentCompletionProtocol.Request.Params params = new 
SegmentCompletionProtocol.Request.Params();
-
-    
params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed)
-        
.withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
-        .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
-        .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
-    if (_isOffHeap) {
-      params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
-    }
-    SegmentCompletionProtocol.Response segmentCommitStartResponse = 
_protocolHandler.segmentCommitStart(params);
-    if (!segmentCommitStartResponse.getStatus()
-        
.equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) {
-      segmentLogger.warn("CommitStart failed  with response {}", 
segmentCommitStartResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-
-    params = new SegmentCompletionProtocol.Request.Params();
-    
params.withOffset(_currentOffset).withSegmentName(_segmentNameStr).withInstanceId(_instanceId);
-    SegmentCompletionProtocol.Response segmentCommitUploadResponse =
-        _protocolHandler.segmentCommitUpload(params, segmentTarFile, 
prevResponse.getControllerVipUrl());
-    if (!segmentCommitUploadResponse.getStatus()
-        
.equals(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS)) {
-      segmentLogger.warn("Segment upload failed  with response {}", 
segmentCommitUploadResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-
-    params = new SegmentCompletionProtocol.Request.Params();
-    
params.withInstanceId(_instanceId).withOffset(_currentOffset).withSegmentName(_segmentNameStr)
-        
.withSegmentLocation(segmentCommitUploadResponse.getSegmentLocation()).withNumRows(_numRowsConsumed)
-        .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
-        .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
-        .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
-    if (_isOffHeap) {
-      params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
-    }
-    SegmentCompletionProtocol.Response commitEndResponse;
-    if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) {
-      commitEndResponse =
-          _protocolHandler.segmentCommitEndWithMetadata(params, 
_segmentBuildDescriptor.getMetadataFiles());
-    } else {
-      commitEndResponse = _protocolHandler.segmentCommitEnd(params);
-    }
-
-    if 
(!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS))
 {
-      segmentLogger.warn("CommitEnd failed  with response {}", 
commitEndResponse.toJsonString());
-      return SegmentCompletionProtocol.RESP_FAILED;
-    }
-    return commitEndResponse;
-  }
-
   protected boolean commitSegment(SegmentCompletionProtocol.Response response) 
{
     final String segTarFileName = 
_segmentBuildDescriptor.getSegmentTarFilePath();
     File segTarFile = new File(segTarFileName);
     if (!segTarFile.exists()) {
       throw new RuntimeException("Segment file does not exist:" + 
segTarFileName);
     }
     SegmentCompletionProtocol.Response returnedResponse;
-    if (response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit()) 
{
-      // Send segmentStart, segmentUpload, & segmentCommitEnd to the controller
-      // if that succeeds, swap in-memory segment with the one built.
-      returnedResponse = doSplitCommit(response);
-    } else {
-      // Send segmentCommit() to the controller
-      // if that succeeds, swap in-memory segment with the one built.
-      returnedResponse = postSegmentCommitMsg();
-    }
+
+    returnedResponse = commit(response);
 
 Review comment:
   Please change this to declare and assign returnedResponse in one line.  Can 
you rename this var to commitResponse? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to