Jackie-Jiang commented on code in PR #9413:
URL: https://github.com/apache/pinot/pull/9413#discussion_r980391515


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")

Review Comment:
   ```suggestion
     @Path("/segments/{tableNameWithType}/updateZKTimeInterval")
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Update the start and end time of the segments based 
on latest schema",
+      notes = "Update the start and end time of the segments based on latest 
schema")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public SuccessResponse updateTimeIntervalZK(
+      @ApiParam(value = "Table name with type", required = true,
+          example = "myTable_REALTIME") @PathParam("tableNameWithType") String 
tableNameWithType) {
+    try {
+
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Table type not provided with table name %s", 
tableNameWithType),
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      return updateTimeIntervalZKInternal(tableNameWithType);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,

Review Comment:
   What exception do you expect here? We shouldn't have this catch



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Update the start and end time of the segments based 
on latest schema",
+      notes = "Update the start and end time of the segments based on latest 
schema")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public SuccessResponse updateTimeIntervalZK(
+      @ApiParam(value = "Table name with type", required = true,
+          example = "myTable_REALTIME") @PathParam("tableNameWithType") String 
tableNameWithType) {
+    try {
+
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Table type not provided with table name %s", 
tableNameWithType),
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      return updateTimeIntervalZKInternal(tableNameWithType);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to get consuming segments info for table %s. 
%s", tableNameWithType, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * Internal method to update schema
+   * @param tableName  name of the table
+   * @return
+   */
+  private SuccessResponse updateTimeIntervalZKInternal(String tableName) {

Review Comment:
   (minor) `tableNameWithType` for clarity



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2265,6 +2284,18 @@ private void deleteTableOnServer(String 
tableNameWithType) {
     }
   }
 
+  public void updateSegmentInterval(TableConfig tableConfig, SegmentZKMetadata 
segmentZKMetadata,
+      Schema newSchema) {
+    String segmentName = segmentZKMetadata.getSegmentName();
+    if (tableConfig != null) {

Review Comment:
   We can remove this check. The check should be performed on the caller side



##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java:
##########
@@ -688,13 +688,18 @@ public void updateBooleanFieldsIfNeeded(Schema oldSchema) 
{
     }
   }
 
+
+  public boolean isBackwardCompatibleWith(Schema oldSchema) {
+    return isBackwardCompatibleWith(oldSchema, false);
+  }
+
   /**
    * Check whether the current schema is backward compatible with oldSchema.
    * Backward compatibility requires all columns and fieldSpec in oldSchema 
should be retained.
    *
    * @param oldSchema old schema
    */
-  public boolean isBackwardCompatibleWith(Schema oldSchema) {
+  public boolean isBackwardCompatibleWith(Schema oldSchema, boolean 
skipTimeColumn) {

Review Comment:
   Revert this part which no longer apply



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -69,6 +100,13 @@ private static void updateSegmentZKMetadata(String 
tableNameWithType, SegmentZKM
       segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
       segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
       segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());

Review Comment:
   We can change this to always put `MILLISECONDS ` as the time unit (convert 
start/end time using the time unit), then we no longer need to worry about time 
unit when updating the time interval



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -746,6 +746,7 @@ private void writeMetadata()
           } else {
             // No records in segment. Use current time as start/end
             long now = System.currentTimeMillis();
+

Review Comment:
   (minor) Revert changes in this file which is not related



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java:
##########
@@ -77,6 +77,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private Duration _timeGranularity;
   private long _segmentStartTime = Long.MAX_VALUE;
   private long _segmentEndTime = Long.MIN_VALUE;
+

Review Comment:
   (minor) Revert changes in this file which is not related



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1284,8 +1284,27 @@ public void addSchema(Schema schema, boolean override, 
boolean force)
     }
   }
 
+  public void updateSchemaDateTime(TableConfig tableConfig, Schema schema)

Review Comment:
   ```suggestion
     public void updateSegmentsZKTimeInterval(TableConfig tableConfig, Schema 
schema)
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1284,8 +1284,27 @@ public void addSchema(Schema schema, boolean override, 
boolean force)
     }
   }
 
+  public void updateSchemaDateTime(TableConfig tableConfig, Schema schema)
+      throws SchemaNotFoundException, SchemaBackwardIncompatibleException, 
TableNotFoundException {

Review Comment:
   It shouldn't throw these exceptions, or simply pass `tableNameWithType` and 
throws these exceptions. Caller should check the exception type to create the 
`ControllerApplicationException` accordingly



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Update the start and end time of the segments based 
on latest schema",
+      notes = "Update the start and end time of the segments based on latest 
schema")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public SuccessResponse updateTimeIntervalZK(
+      @ApiParam(value = "Table name with type", required = true,
+          example = "myTable_REALTIME") @PathParam("tableNameWithType") String 
tableNameWithType) {
+    try {
+
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Table type not provided with table name %s", 
tableNameWithType),
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      return updateTimeIntervalZKInternal(tableNameWithType);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to get consuming segments info for table %s. 
%s", tableNameWithType, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * Internal method to update schema
+   * @param tableName  name of the table
+   * @return
+   */
+  private SuccessResponse updateTimeIntervalZKInternal(String tableName) {
+
+    try {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableName);
+      if (tableConfig == null) {
+        throw new ControllerApplicationException(LOGGER,

Review Comment:
   This is user error, where the table does not exist



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Update the start and end time of the segments based 
on latest schema",
+      notes = "Update the start and end time of the segments based on latest 
schema")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public SuccessResponse updateTimeIntervalZK(
+      @ApiParam(value = "Table name with type", required = true,
+          example = "myTable_REALTIME") @PathParam("tableNameWithType") String 
tableNameWithType) {
+    try {
+
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Table type not provided with table name %s", 
tableNameWithType),
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      return updateTimeIntervalZKInternal(tableNameWithType);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to get consuming segments info for table %s. 
%s", tableNameWithType, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * Internal method to update schema
+   * @param tableName  name of the table
+   * @return
+   */
+  private SuccessResponse updateTimeIntervalZKInternal(String tableName) {
+
+    try {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableName);
+      if (tableConfig == null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("No table config found for table %s", tableName), 
Response.Status.INTERNAL_SERVER_ERROR);
+      }
+
+      Schema tableSchema = 
_pinotHelixResourceManager.getTableSchema(tableName);
+      if (tableSchema == null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("No schema found for table %s", tableName), 
Response.Status.INTERNAL_SERVER_ERROR);
+      }
+
+      String schemaName = tableSchema.getSchemaName();
+      _pinotHelixResourceManager.updateSchemaDateTime(tableConfig, 
tableSchema);
+      // Best effort notification. If controller fails at this point, no 
notification is given.
+      LOGGER.info("Notifying metadata event for updating schema: {}", 
schemaName);
+      return new SuccessResponse("Successfully updated time interval zk 
metadata for table: " + tableName);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,

Review Comment:
   We don't want t global try-catch, which will also catch the user errors



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
           Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
+
+  @POST
+  @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Update the start and end time of the segments based 
on latest schema",
+      notes = "Update the start and end time of the segments based on latest 
schema")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public SuccessResponse updateTimeIntervalZK(
+      @ApiParam(value = "Table name with type", required = true,
+          example = "myTable_REALTIME") @PathParam("tableNameWithType") String 
tableNameWithType) {
+    try {
+
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Table type not provided with table name %s", 
tableNameWithType),
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      return updateTimeIntervalZKInternal(tableNameWithType);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to get consuming segments info for table %s. 
%s", tableNameWithType, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * Internal method to update schema
+   * @param tableName  name of the table
+   * @return
+   */
+  private SuccessResponse updateTimeIntervalZKInternal(String tableName) {
+
+    try {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableName);
+      if (tableConfig == null) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("No table config found for table %s", tableName), 
Response.Status.INTERNAL_SERVER_ERROR);
+      }
+
+      Schema tableSchema = 
_pinotHelixResourceManager.getTableSchema(tableName);
+      if (tableSchema == null) {
+        throw new ControllerApplicationException(LOGGER,

Review Comment:
   Same here, user error instead of internal error



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1284,8 +1284,27 @@ public void addSchema(Schema schema, boolean override, 
boolean force)
     }
   }
 
+  public void updateSchemaDateTime(TableConfig tableConfig, Schema schema)
+      throws SchemaNotFoundException, SchemaBackwardIncompatibleException, 
TableNotFoundException {
+    String schemaName = schema.getSchemaName();
+    LOGGER.info("Updating segment zookeeper time interval metadata: {}", 
schemaName);
+
+    String tableNameWithType = tableConfig.getTableName();
+    List<SegmentZKMetadata> segmentZKMetadataList = 
getSegmentsZKMetadata(tableNameWithType);
+    for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      int version = segmentZKMetadata.toZNRecord().getVersion();
+      updateSegmentInterval(tableConfig, segmentZKMetadata, schema);
+      updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
+    }
+  }
+
   public void updateSchema(Schema schema, boolean reload)
       throws SchemaNotFoundException, SchemaBackwardIncompatibleException, 
TableNotFoundException {
+    updateSchema(schema, reload, false);
+  }
+
+  public void updateSchema(Schema schema, boolean reload, boolean force)

Review Comment:
   Suggest reverting this unrelated change. We might not want force and reload 
at same time because we don't want to reload backward incompatible schema change



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -69,6 +100,13 @@ private static void updateSegmentZKMetadata(String 
tableNameWithType, SegmentZKM
       segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
       segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
       segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+      if (segmentMetadata.getTimeColumn() != null) {

Review Comment:
   (minor) This check is redundant



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -56,6 +62,31 @@ public static void refreshSegmentZKMetadata(String 
tableNameWithType, SegmentZKM
         segmentSizeInBytes, false);
   }
 
+  public static void updateSegmentZKMetadataInterval(TableConfig tableConfig, 
SegmentZKMetadata segmentZKMetadata,

Review Comment:
   We can get the `DateTimeFieldSpec` on the caller side and pass it in here
   ```suggestion
     public static void updateSegmentZKTimeInterval(SegmentZKMetadata 
segmentZKMetadata, DateTimeFieldSpec dateTimeFieldSpec)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to