Jackie-Jiang commented on code in PR #9413:
URL: https://github.com/apache/pinot/pull/9413#discussion_r980391515
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
+
+ @POST
+ @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
Review Comment:
```suggestion
@Path("/segments/{tableNameWithType}/updateZKTimeInterval")
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
+
+ @POST
+ @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Update the start and end time of the segments based
on latest schema",
+ notes = "Update the start and end time of the segments based on latest
schema")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public SuccessResponse updateTimeIntervalZK(
+ @ApiParam(value = "Table name with type", required = true,
+ example = "myTable_REALTIME") @PathParam("tableNameWithType") String
tableNameWithType) {
+ try {
+
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Table type not provided with table name %s",
tableNameWithType),
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return updateTimeIntervalZKInternal(tableNameWithType);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
Review Comment:
What exception do you expect here? We shouldn't have this catch
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
+
+ @POST
+ @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Update the start and end time of the segments based
on latest schema",
+ notes = "Update the start and end time of the segments based on latest
schema")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public SuccessResponse updateTimeIntervalZK(
+ @ApiParam(value = "Table name with type", required = true,
+ example = "myTable_REALTIME") @PathParam("tableNameWithType") String
tableNameWithType) {
+ try {
+
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Table type not provided with table name %s",
tableNameWithType),
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return updateTimeIntervalZKInternal(tableNameWithType);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to get consuming segments info for table %s.
%s", tableNameWithType, e.getMessage()),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * Internal method to update schema
+ * @param tableName name of the table
+ * @return
+ */
+ private SuccessResponse updateTimeIntervalZKInternal(String tableName) {
Review Comment:
(minor) `tableNameWithType` for clarity
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2265,6 +2284,18 @@ private void deleteTableOnServer(String
tableNameWithType) {
}
}
+ public void updateSegmentInterval(TableConfig tableConfig, SegmentZKMetadata
segmentZKMetadata,
+ Schema newSchema) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ if (tableConfig != null) {
Review Comment:
We can remove this check. The check should be performed on the caller side
##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java:
##########
@@ -688,13 +688,18 @@ public void updateBooleanFieldsIfNeeded(Schema oldSchema)
{
}
}
+
+ public boolean isBackwardCompatibleWith(Schema oldSchema) {
+ return isBackwardCompatibleWith(oldSchema, false);
+ }
+
/**
* Check whether the current schema is backward compatible with oldSchema.
* Backward compatibility requires all columns and fieldSpec in oldSchema
should be retained.
*
* @param oldSchema old schema
*/
- public boolean isBackwardCompatibleWith(Schema oldSchema) {
+ public boolean isBackwardCompatibleWith(Schema oldSchema, boolean
skipTimeColumn) {
Review Comment:
Revert this part which no longer apply
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -69,6 +100,13 @@ private static void updateSegmentZKMetadata(String
tableNameWithType, SegmentZKM
segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
Review Comment:
We can change this to always put `MILLISECONDS ` as the time unit (convert
start/end time using the time unit), then we no longer need to worry about time
unit when updating the time interval
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -746,6 +746,7 @@ private void writeMetadata()
} else {
// No records in segment. Use current time as start/end
long now = System.currentTimeMillis();
+
Review Comment:
(minor) Revert changes in this file which is not related
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java:
##########
@@ -77,6 +77,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
private Duration _timeGranularity;
private long _segmentStartTime = Long.MAX_VALUE;
private long _segmentEndTime = Long.MIN_VALUE;
+
Review Comment:
(minor) Revert changes in this file which is not related
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1284,8 +1284,27 @@ public void addSchema(Schema schema, boolean override,
boolean force)
}
}
+ public void updateSchemaDateTime(TableConfig tableConfig, Schema schema)
Review Comment:
```suggestion
public void updateSegmentsZKTimeInterval(TableConfig tableConfig, Schema
schema)
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1284,8 +1284,27 @@ public void addSchema(Schema schema, boolean override,
boolean force)
}
}
+ public void updateSchemaDateTime(TableConfig tableConfig, Schema schema)
+ throws SchemaNotFoundException, SchemaBackwardIncompatibleException,
TableNotFoundException {
Review Comment:
It shouldn't throw these exceptions, or simply pass `tableNameWithType` and
throws these exceptions. Caller should check the exception type to create the
`ControllerApplicationException` accordingly
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
+
+ @POST
+ @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Update the start and end time of the segments based
on latest schema",
+ notes = "Update the start and end time of the segments based on latest
schema")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public SuccessResponse updateTimeIntervalZK(
+ @ApiParam(value = "Table name with type", required = true,
+ example = "myTable_REALTIME") @PathParam("tableNameWithType") String
tableNameWithType) {
+ try {
+
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Table type not provided with table name %s",
tableNameWithType),
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return updateTimeIntervalZKInternal(tableNameWithType);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to get consuming segments info for table %s.
%s", tableNameWithType, e.getMessage()),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * Internal method to update schema
+ * @param tableName name of the table
+ * @return
+ */
+ private SuccessResponse updateTimeIntervalZKInternal(String tableName) {
+
+ try {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableName);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER,
Review Comment:
This is user error, where the table does not exist
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
+
+ @POST
+ @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Update the start and end time of the segments based
on latest schema",
+ notes = "Update the start and end time of the segments based on latest
schema")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public SuccessResponse updateTimeIntervalZK(
+ @ApiParam(value = "Table name with type", required = true,
+ example = "myTable_REALTIME") @PathParam("tableNameWithType") String
tableNameWithType) {
+ try {
+
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Table type not provided with table name %s",
tableNameWithType),
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return updateTimeIntervalZKInternal(tableNameWithType);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to get consuming segments info for table %s.
%s", tableNameWithType, e.getMessage()),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * Internal method to update schema
+ * @param tableName name of the table
+ * @return
+ */
+ private SuccessResponse updateTimeIntervalZKInternal(String tableName) {
+
+ try {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableName);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("No table config found for table %s", tableName),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+
+ Schema tableSchema =
_pinotHelixResourceManager.getTableSchema(tableName);
+ if (tableSchema == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("No schema found for table %s", tableName),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+
+ String schemaName = tableSchema.getSchemaName();
+ _pinotHelixResourceManager.updateSchemaDateTime(tableConfig,
tableSchema);
+ // Best effort notification. If controller fails at this point, no
notification is given.
+ LOGGER.info("Notifying metadata event for updating schema: {}",
schemaName);
+ return new SuccessResponse("Successfully updated time interval zk
metadata for table: " + tableName);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
Review Comment:
We don't want t global try-catch, which will also catch the user errors
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1035,4 +1037,66 @@ public
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsI
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
+
+ @POST
+ @Path("/tables/{tableNameWithType}/updateTimeIntervalZK")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Update the start and end time of the segments based
on latest schema",
+ notes = "Update the start and end time of the segments based on latest
schema")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public SuccessResponse updateTimeIntervalZK(
+ @ApiParam(value = "Table name with type", required = true,
+ example = "myTable_REALTIME") @PathParam("tableNameWithType") String
tableNameWithType) {
+ try {
+
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Table type not provided with table name %s",
tableNameWithType),
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return updateTimeIntervalZKInternal(tableNameWithType);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to get consuming segments info for table %s.
%s", tableNameWithType, e.getMessage()),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * Internal method to update schema
+ * @param tableName name of the table
+ * @return
+ */
+ private SuccessResponse updateTimeIntervalZKInternal(String tableName) {
+
+ try {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableName);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("No table config found for table %s", tableName),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+
+ Schema tableSchema =
_pinotHelixResourceManager.getTableSchema(tableName);
+ if (tableSchema == null) {
+ throw new ControllerApplicationException(LOGGER,
Review Comment:
Same here, user error instead of internal error
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1284,8 +1284,27 @@ public void addSchema(Schema schema, boolean override,
boolean force)
}
}
+ public void updateSchemaDateTime(TableConfig tableConfig, Schema schema)
+ throws SchemaNotFoundException, SchemaBackwardIncompatibleException,
TableNotFoundException {
+ String schemaName = schema.getSchemaName();
+ LOGGER.info("Updating segment zookeeper time interval metadata: {}",
schemaName);
+
+ String tableNameWithType = tableConfig.getTableName();
+ List<SegmentZKMetadata> segmentZKMetadataList =
getSegmentsZKMetadata(tableNameWithType);
+ for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+ int version = segmentZKMetadata.toZNRecord().getVersion();
+ updateSegmentInterval(tableConfig, segmentZKMetadata, schema);
+ updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
+ }
+ }
+
public void updateSchema(Schema schema, boolean reload)
throws SchemaNotFoundException, SchemaBackwardIncompatibleException,
TableNotFoundException {
+ updateSchema(schema, reload, false);
+ }
+
+ public void updateSchema(Schema schema, boolean reload, boolean force)
Review Comment:
Suggest reverting this unrelated change. We might not want force and reload
at same time because we don't want to reload backward incompatible schema change
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -69,6 +100,13 @@ private static void updateSegmentZKMetadata(String
tableNameWithType, SegmentZKM
segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+ if (segmentMetadata.getTimeColumn() != null) {
Review Comment:
(minor) This check is redundant
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -56,6 +62,31 @@ public static void refreshSegmentZKMetadata(String
tableNameWithType, SegmentZKM
segmentSizeInBytes, false);
}
+ public static void updateSegmentZKMetadataInterval(TableConfig tableConfig,
SegmentZKMetadata segmentZKMetadata,
Review Comment:
We can get the `DateTimeFieldSpec` on the caller side and pass it in here
```suggestion
public static void updateSegmentZKTimeInterval(SegmentZKMetadata
segmentZKMetadata, DateTimeFieldSpec dateTimeFieldSpec)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]