mcvsubbu commented on a change in pull request #4713: Refactoring realtime
segment committer
URL: https://github.com/apache/incubator-pinot/pull/4713#discussion_r341702522
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -837,22 +781,28 @@ protected boolean
commitSegment(SegmentCompletionProtocol.Response response) {
return true;
}
- protected SegmentCompletionProtocol.Response postSegmentCommitMsg() {
- final File segmentTarFile = new
File(_segmentBuildDescriptor.getSegmentTarFilePath());
+ protected SegmentCompletionProtocol.Response
commit(SegmentCompletionProtocol.Response response) {
SegmentCompletionProtocol.Request.Params params = new
SegmentCompletionProtocol.Request.Params();
-
params.withInstanceId(_instanceId).withOffset(_currentOffset).withSegmentName(_segmentNameStr)
- .withNumRows(_numRowsConsumed).withInstanceId(_instanceId)
- .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
+
+
params.withSegmentName(_segmentNameStr).withOffset(_currentOffset).withNumRows(_numRowsConsumed)
+
.withInstanceId(_instanceId).withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
.withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
.withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
if (_isOffHeap) {
params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
}
- SegmentCompletionProtocol.Response response =
_protocolHandler.segmentCommit(params, segmentTarFile);
- if
(!response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS))
{
- segmentLogger.warn("Commit failed with response {}",
response.toJsonString());
+
+ Configuration config = new PropertiesConfiguration();
+
+ if (response.isSplitCommit() && _indexLoadingConfig.isEnableSplitCommit())
{
+ config.setProperty(CommonConstants.Segment.Realtime.COMMITTER_CLASS,
SplitSegmentCommitter.class.getName());
Review comment:
Like we discussed, I think Config is a bit too much of overhead here. We
really cannot avoid the if statement before creating a committer (since it
still depends on the controller's response to the server's segmentConsumed()
call), we can call different methods in the factory (like
createDefaultCommitter() vs createSplitCommitter()). The factory itself can be
constructed in the ctor of LLRealtimeSegmentDataManager with parameters that
are needed for both committers (indexLoadingConfig, segmentLogger, and perhaps
even protocolHandler). thanks
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]