xiangfu0 commented on code in PR #18724:
URL: https://github.com/apache/pinot/pull/18724#discussion_r3385526209
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java:
##########
@@ -45,6 +69,23 @@ private SegmentProcessorUtils() {
*/
public static Pair<List<FieldSpec>, Integer> getFieldSpecs(Schema schema,
MergeType mergeType,
@Nullable List<String> sortOrder) {
+ return getFieldSpecs(schema, mergeType, sortOrder, false);
+ }
+
+ /**
+ * Returns the field specs (physical only) and number of sort fields based
on the merge type and sort order.
+ * <p>When {@code includeOriginalTimeField} is {@code true} (only allowed
for ROLLUP), a hidden LONG column
+ * ({@link TimeHandler#ORIGINAL_TIME_MS_COLUMN}) is appended as the last
sort field. It carries the original
+ * (pre-rounding) time value in millis so that rows within the same rollup
group are sorted by the original time,
+ * which is required by order sensitive aggregations
(FIRSTWITHTIME/LASTWITHTIME). The hidden column is not part of
+ * the rollup group key, and is stripped by the reducer before the output
segments are created.
+ */
+ public static Pair<List<FieldSpec>, Integer> getFieldSpecs(Schema schema,
MergeType mergeType,
+ @Nullable List<String> sortOrder, boolean includeOriginalTimeField) {
+ Preconditions.checkArgument(!includeOriginalTimeField || mergeType ==
MergeType.ROLLUP,
+ "Original time field can only be included for ROLLUP merge type");
+
Preconditions.checkArgument(!schema.hasColumn(TimeHandler.ORIGINAL_TIME_MS_COLUMN),
+ "Schema must not contain the reserved column: %s",
TimeHandler.ORIGINAL_TIME_MS_COLUMN);
Review Comment:
Fixed in 4e53e5f — the reserved-column check is now inside the
`includeOriginalTimeField` block, so CONCAT/DEDUP/plain-ROLLUP flows are
unaffected.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -337,12 +339,14 @@ public void validateTaskConfigs(TableConfig tableConfig,
Schema schema, Map<Stri
Set<String> columnNames = schema.getColumnNames();
for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
if (entry.getKey().endsWith(".aggregationType")) {
- Preconditions.checkState(columnNames.contains(
- StringUtils.removeEnd(entry.getKey(),
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+ String column =
+ StringUtils.removeEnd(entry.getKey(),
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX);
+ Preconditions.checkState(columnNames.contains(column),
String.format("Column \"%s\" not found in schema!",
entry.getKey()));
Review Comment:
Fixed in 4e53e5f — the error message now prints the derived column name
instead of the task config key.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -352,6 +356,12 @@ public void validateTaskConfigs(TableConfig tableConfig,
Schema schema, Map<Stri
String.format("Column \"%s\" has invalid aggregate type: %s",
entry.getKey(), entry.getValue());
throw new IllegalStateException(err);
Review Comment:
Fixed in 4e53e5f — the IllegalStateException now keeps the original cause
and formats the message with the column name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]