saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r985368922


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -822,4 +821,8 @@ public static class Range {
       public static final String UPPER_UNBOUNDED = DELIMITER + UNBOUNDED + 
UPPER_EXCLUSIVE;
     }
   }
+
+  public static class IdealState {
+    public static final String QUERY_TIME_BOUNDARY = 
"HYBRID_TABLE_TIME_BOUNDARY";

Review Comment:
   Ack



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -20,7 +20,6 @@
 
 import java.io.File;
 
-

Review Comment:
   Ack



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +526,59 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/allSegmentsLoaded")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Validates if the ideal state matches with the 
segmentstate on this server", notes =
+      "Validates if the ideal state matches with the segmentstate on this 
server")

Review Comment:
   Ack



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +526,59 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/allSegmentsLoaded")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Validates if the ideal state matches with the 
segmentstate on this server", notes =
+      "Validates if the ideal state matches with the segmentstate on this 
server")
+  public TableSegmentValidationInfo validateTableSegmentState(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableNameWithType")
+          String tableNameWithType) {
+    // Get table current ideal state
+    IdealState tableIdealState = 
HelixHelper.getTableIdealState(_serverInstance.getHelixManager(), 
tableNameWithType);
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, 
tableNameWithType);
+
+    // Validate segments in idealstate which belong to this server
+    long maxEndTime = -1;
+    Map<String, Map<String, String>> instanceStatesMap = 
tableIdealState.getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> kv : 
instanceStatesMap.entrySet()) {
+      String segmentName = kv.getKey();
+      if (kv.getValue().containsKey(_instanceId)) {
+        // Segment hosted by this server. Validate segment state
+        SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+        try {
+          String segmentState = kv.getValue().get(_instanceId);
+
+          switch (segmentState) {
+            case CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING:
+              // Only validate presence of segment
+              if (segmentDataManager == null) {
+                return new TableSegmentValidationInfo(false, -1);
+              }
+              break;
+            case CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE:
+              // Validate segment CRC
+              SegmentZKMetadata zkMetadata =
+                  
ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(),
+                      tableNameWithType, segmentName);

Review Comment:
   Ack



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java:
##########
@@ -21,10 +21,12 @@
 public class TimeBoundaryInfo {
   private final String _timeColumn;
   private final String _timeValue;
+  private final boolean _enforced;

Review Comment:
   Ack



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline 
segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' 
metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", 
required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || 
!_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid 
table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + 
offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate 
table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment 
validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, 
tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate 
table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), 
offlineTableName, is -> {
+          assert is != null;
+          
is.getRecord().setSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY, 
timeBoundaryFinal);
+          return is;
+        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not update time 
boundary",
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+
+    return new SuccessResponse("Time boundary updated successfully to " + 
timeBoundaryFinal);
+  }
+
+  @DELETE
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Delete hybrid table query time boundary", notes = 
"Delete hybrid table query time boundary")
+  public SuccessResponse deleteEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", 
required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || 
!_pinotHelixResourceManager.hasOfflineTable(

Review Comment:
   Ack. Added check for offline table ideal state not being null.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline 
segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' 
metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", 
required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || 
!_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid 
table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + 
offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate 
table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment 
validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, 
tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate 
table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), 
offlineTableName, is -> {
+          assert is != null;
+          
is.getRecord().setSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY, 
timeBoundaryFinal);
+          return is;
+        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not update time 
boundary",
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+
+    return new SuccessResponse("Time boundary updated successfully to " + 
timeBoundaryFinal);
+  }
+
+  @DELETE
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Delete hybrid table query time boundary", notes = 
"Delete hybrid table query time boundary")
+  public SuccessResponse deleteEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", 
required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || 
!_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid 
table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Delete the timeBoundary in tableIdealState
+    IdealState idealState =
+        
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), 
offlineTableName, is -> {
+          assert is != null;

Review Comment:
   Ack. Added ideal state not being null check.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline 
segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' 
metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", 
required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || 
!_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid 
table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + 
offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate 
table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment 
validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, 
tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate 
table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), 
offlineTableName, is -> {
+          assert is != null;

Review Comment:
   Ack. Added ideal state != null precondition check



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline 
segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' 
metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", 
required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || 
!_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid 
table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + 
offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate 
table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment 
validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);

Review Comment:
   Set to PRECONDITION_FAILED (412)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline 
segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' 
metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", 
required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || 
!_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid 
table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + 
offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate 
table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;

Review Comment:
   Ack



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline 
segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' 
metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", 
required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || 
!_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid 
table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);

Review Comment:
   Ack



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")

Review Comment:
   Ack



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long 
extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - 
_timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || 
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", 
timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, 
long maxEndTimeMs,
+      boolean idealStateReffered) {
+    TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+    Long finalTimeBoundaryMs = null;
+    boolean isEnforced = false;
+    boolean validTimeBoundaryFound = false;
+
+    if (enforcedTimeBoundary != null) {
+      finalTimeBoundaryMs = enforcedTimeBoundary;
+      isEnforced = true;
+      validTimeBoundaryFound = true;
+      LOGGER.info("Enforced table time boundary in use: {} for table: {}", 
enforcedTimeBoundary, _offlineTableName);
+    } else if (idealStateReffered || !currentTimeBoundaryInfo.isEnforced()) {
+      if (maxEndTimeMs > 0) {
+        finalTimeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
+        validTimeBoundaryFound = true;
+      } else {
+        LOGGER.warn("Failed to find segment with valid end time for table: {}, 
no time boundary generated",
+            _offlineTableName);
+      }
+    } else {
+      validTimeBoundaryFound = true;
+      LOGGER.info("Skipping time boundary update since enforced time boundary 
exists");
+    }
+
+    if (validTimeBoundaryFound) {
+      if (finalTimeBoundaryMs != null) {
+        String timeBoundary = 
_timeFormatSpec.fromMillisToFormat(finalTimeBoundaryMs);
+        if (currentTimeBoundaryInfo == null || 
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {

Review Comment:
   Ack



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long 
extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - 
_timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || 
!currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", 
timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, 
long maxEndTimeMs,
+      boolean idealStateReffered) {
+    TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+    Long finalTimeBoundaryMs = null;
+    boolean isEnforced = false;
+    boolean validTimeBoundaryFound = false;
+
+    if (enforcedTimeBoundary != null) {
+      finalTimeBoundaryMs = enforcedTimeBoundary;
+      isEnforced = true;
+      validTimeBoundaryFound = true;
+      LOGGER.info("Enforced table time boundary in use: {} for table: {}", 
enforcedTimeBoundary, _offlineTableName);
+    } else if (idealStateReffered || !currentTimeBoundaryInfo.isEnforced()) {
+      if (maxEndTimeMs > 0) {
+        finalTimeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
+        validTimeBoundaryFound = true;
+      } else {
+        LOGGER.warn("Failed to find segment with valid end time for table: {}, 
no time boundary generated",
+            _offlineTableName);
+      }
+    } else {
+      validTimeBoundaryFound = true;
+      LOGGER.info("Skipping time boundary update since enforced time boundary 
exists");
+    }
+
+    if (validTimeBoundaryFound) {
+      if (finalTimeBoundaryMs != null) {

Review Comment:
   Good catch! Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to