mcvsubbu commented on a change in pull request #4592: Refactor Realtime Segment Data Manager to remove repeat logic URL: https://github.com/apache/incubator-pinot/pull/4592#discussion_r322506101
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java ########## @@ -760,73 +760,28 @@ protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { } } - private SegmentCompletionProtocol.Response doSplitCommit(SegmentCompletionProtocol.Response prevResponse) { - final File segmentTarFile = new File(_segmentBuildDescriptor.getSegmentTarFilePath()); + protected boolean commitSegment(SegmentCompletionProtocol.Response response) { + final String segTarFileName = _segmentBuildDescriptor.getSegmentTarFilePath(); + File segTarFile = new File(segTarFileName); + if (!segTarFile.exists()) { + throw new RuntimeException("Segment file does not exist:" + segTarFileName); + } + SegmentCompletionProtocol.Response returnedResponse; + boolean isSplitCommit = response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit(); + SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed) .withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis()) .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes()) .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis()); - if (_isOffHeap) { - params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes()); - } - SegmentCompletionProtocol.Response segmentCommitStartResponse = _protocolHandler.segmentCommitStart(params); - if (!segmentCommitStartResponse.getStatus() - .equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) { - segmentLogger.warn("CommitStart failed with response {}", segmentCommitStartResponse.toJsonString()); - return SegmentCompletionProtocol.RESP_FAILED; - } - params = new SegmentCompletionProtocol.Request.Params(); - params.withOffset(_currentOffset).withSegmentName(_segmentNameStr).withInstanceId(_instanceId); - SegmentCompletionProtocol.Response segmentCommitUploadResponse = - _protocolHandler.segmentCommitUpload(params, segmentTarFile, prevResponse.getControllerVipUrl()); - if (!segmentCommitUploadResponse.getStatus() - .equals(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS)) { - segmentLogger.warn("Segment upload failed with response {}", segmentCommitUploadResponse.toJsonString()); - return SegmentCompletionProtocol.RESP_FAILED; - } - - params = new SegmentCompletionProtocol.Request.Params(); - params.withInstanceId(_instanceId).withOffset(_currentOffset).withSegmentName(_segmentNameStr) - .withSegmentLocation(segmentCommitUploadResponse.getSegmentLocation()).withNumRows(_numRowsConsumed) - .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis()) - .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes()) - .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis()); if (_isOffHeap) { params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes()); } - SegmentCompletionProtocol.Response commitEndResponse; - if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) { - commitEndResponse = _protocolHandler.segmentCommitEndWithMetadata(params, _segmentBuildDescriptor.getMetadataFiles()); - } else { - commitEndResponse = _protocolHandler.segmentCommitEnd(params); - } - if (!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { - segmentLogger.warn("CommitEnd failed with response {}", commitEndResponse.toJsonString()); - return SegmentCompletionProtocol.RESP_FAILED; - } - return commitEndResponse; - } - - protected boolean commitSegment(SegmentCompletionProtocol.Response response) { - final String segTarFileName = _segmentBuildDescriptor.getSegmentTarFilePath(); - File segTarFile = new File(segTarFileName); - if (!segTarFile.exists()) { - throw new RuntimeException("Segment file does not exist:" + segTarFileName); - } - SegmentCompletionProtocol.Response returnedResponse; - if (response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit()) { - // Send segmentStart, segmentUpload, & segmentCommitEnd to the controller - // if that succeeds, swap in-memory segment with the one built. - returnedResponse = doSplitCommit(response); - } else { - // Send segmentCommit() to the controller - // if that succeeds, swap in-memory segment with the one built. - returnedResponse = postSegmentCommitMsg(); - } + SegmentCommitter segmentCommitter = new SegmentCommitter(isSplitCommit, _segmentNameStr, _protocolHandler, _indexLoadingConfig, response, params); Review comment: segmentNameStr should not be needed. Already there in `params` Remove `_indexLoadingConfig`. Instead, extract value of `isSplitCommitEndWithMetadata` and pass it to the constructor. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org