Jackie-Jiang commented on a change in pull request #4156: Refactor 
HelixExternalViewBasedTimeBoundaryService to support all time units
URL: https://github.com/apache/incubator-pinot/pull/4156#discussion_r279969005
 
 

 ##########
 File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
 ##########
 @@ -48,77 +50,106 @@ public 
HelixExternalViewBasedTimeBoundaryService(ZkHelixPropertyStore<ZNRecord>
   }
 
   public void updateTimeBoundaryService(ExternalView externalView) {
-    if (_propertyStore == null) {
-      return;
-    }
-    String tableName = externalView.getResourceName();
-    // Do nothing for realtime table.
-    if (TableNameBuilder.getTableTypeFromTableName(tableName) == 
TableType.REALTIME) {
+    String tableNameWithType = externalView.getResourceName();
+
+    // Skip real-time table, only use offline table to update the time boundary
+    if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.REALTIME) {
       return;
     }
 
     Set<String> offlineSegmentsServing = externalView.getPartitionSet();
     if (offlineSegmentsServing.isEmpty()) {
-      LOGGER.info("Skipping updating time boundary service for table '{}' with 
no offline segments.", tableName);
+      LOGGER.warn("Skipping updating time boundary for table: '{}' with no 
offline segment", tableNameWithType);
       return;
     }
 
-    TableConfig offlineTableConfig = 
ZKMetadataProvider.getOfflineTableConfig(_propertyStore, tableName);
-    assert offlineTableConfig != null;
-    TimeUnit tableTimeUnit = 
offlineTableConfig.getValidationConfig().getTimeType();
-    if (tableTimeUnit == null) {
-      LOGGER.info("Skipping updating time boundary service for table '{}' 
because time unit is not set", tableName);
+    // TODO: when we start using dateTime, pick the time column from the 
retention config, and use the DateTimeFieldSpec
+    //       from the schema to determine the time unit
+    // TODO: support SDF
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    assert tableConfig != null;
+    SegmentsValidationAndRetentionConfig retentionConfig = 
tableConfig.getValidationConfig();
+    String timeColumn = retentionConfig.getTimeColumnName();
+    TimeUnit tableTimeUnit = retentionConfig.getTimeType();
+    if (timeColumn == null || tableTimeUnit == null) {
+      LOGGER.error("Skipping updating time boundary for table: '{}' because 
time column/unit is not set",
+          tableNameWithType);
       return;
     }
 
-    // Bulk reading all segment zk-metadata at once is more efficient than 
reading one at a time.
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableNameWithType);
+    assert schema != null;
+    if (!timeColumn.equals(schema.getTimeColumnName())) {
+      LOGGER.error("Time column does not match in table config: '{}' and 
schema: '{}'", timeColumn,
+          schema.getTimeColumnName());
+    }
+    if (tableTimeUnit != schema.getOutgoingTimeUnit()) {
+      LOGGER.error("Time unit does not match in table config: '{}' and schema: 
'{}'", tableTimeUnit,
+          schema.getOutgoingTimeUnit());
+    }
+
+    // Bulk reading all segment ZK metadata is more efficient than reading one 
at a time
     List<OfflineSegmentZKMetadata> segmentZKMetadataList =
-        
ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, 
tableName);
+        
ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, 
tableNameWithType);
 
-    long maxTimeValue = 
computeMaxSegmentEndTimeForTable(segmentZKMetadataList, tableTimeUnit);
-    TimeBoundaryInfo timeBoundaryInfo = new TimeBoundaryInfo();
-    
timeBoundaryInfo.setTimeColumn(offlineTableConfig.getValidationConfig().getTimeColumnName());
+    long maxTimeValue = -1L;
+    for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      String segmentName = segmentZKMetadata.getSegmentName();
 
-    timeBoundaryInfo.setTimeValue(Long.toString(maxTimeValue));
-    _timeBoundaryInfoMap.put(tableName, timeBoundaryInfo);
+      // Only consider segments in the external view
+      if (!offlineSegmentsServing.contains(segmentName)) {
+        LOGGER.warn("Skipping processing segment: '{}' for table: '{}' because 
it does not exist in the external view",
+            segmentName, tableNameWithType);
+        continue;
+      }
 
-    LOGGER.info("Updated time boundary service for table '{}', maxTime: {}", 
tableName, maxTimeValue);
-  }
+      // Check if time unit in segment ZK metadata matches table time unit
 
 Review comment:
   Wanted to catch the inconsistency between ZK metadata and schema, but seems 
not necessary, removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to