paul-rogers commented on code in PR #13793:
URL: https://github.com/apache/druid/pull/13793#discussion_r1114922369


##########
docs/querying/sql-aggregations.md:
##########
@@ -86,7 +86,7 @@ In the aggregation functions supported by Druid, only 
`COUNT`, `ARRAY_AGG`, and
 |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. 
The `maxBytesPerString` parameter determines how much aggregation space to 
allocate per string. Strings longer than this limit are truncated. This 
parameter should be set as low as possible, since high values will lead to 
wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, 
otherwise `''`|
 |`EARLIEST_BY(expr, timestampExpr)`|Returns the earliest value of `expr`, 
which must be numeric. The earliest value of `expr` is taken from the row with 
the overall earliest non-null value of `timestampExpr`. If the earliest 
non-null value of `timestampExpr` appears in multiple rows, the `expr` may be 
taken from any of those rows.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`EARLIEST_BY(expr, timestampExpr, maxBytesPerString)`| Like 
`EARLIEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` 
parameter determines how much aggregation space to allocate per string. Strings 
longer than this limit are truncated. This parameter should be set as low as 
possible, since high values will lead to wasted memory.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If 
`expr` comes from a relation with a timestamp column (like `__time` in a Druid 
datasource), the "latest" is taken from the row with the overall latest 
non-null value of the timestamp column. If the latest non-null value of the 
timestamp column appears in multiple rows, the `expr` may be taken from any of 
those rows. If `expr` does not come from a relation with a timestamp, then it 
is simply the last value encountered.|`null` if 
`druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. The 
`expr` must come from a relation with a timestamp column (like `__time` in a 
Druid datasource) and the "latest" is taken from the row with the overall 
latest non-null value of the timestamp column. If the latest non-null value of 
the timestamp column appears in multiple rows, the `expr` may be taken from any 
of those rows. |`null` if `druid.generic.useDefaultValueForNull=false`, 
otherwise `0`|

Review Comment:
   This description is a bit off. To correct it, we just understand how this 
aggregation works (or is supposed to work). This aggregation applies _only_ to 
a Druid datasource. Why? Because we need the `__time` column, and that column 
only exists in Druid datasources (where it is guaranteed to exist.)
   
   When using this function in _queries_, the function implicitly references 
that datasource `__time` column. If the aggregation is used on a datasource 
without a `__time` column, it should fail. Although all Druid datasources have 
`__time` a Druid data source (with a space) need not have such a column. 
Example: the output of a subquery. In this case, the function cannot be used: 
the explicit form is required.
   
   For _ingestion_ (via MSQ) the story is more complex. We have to consider 
whether we're creating a detail table or a rollup table. If we're creating a 
detail table (that is, no aggregation), then the same rules for queries apply.
   
   But, if we are building a table _with_ rollup, we must apply slightly 
different rules. In the case of rollup, the `__time` column is the one in the 
_target_ datasource, not the source input source. (Sorry for Druid's awkward 
naming.)
   
   This is true because aggregation for rollup tables is best thought of as a 
property of the table (datasource), not the query. Why? I can do two ingestions 
into two segments for the same time chunk. I can then use compaction to combine 
them. Compaction references only data within the datasource: it has no ability 
to reference the (now long gone) original input sources.
   
   The unifying rule is that `LATEST(x)` implicitly refers to the `__time` 
column in the same record as the value of `x` appears, _after_ the record has 
been projected into the target datasource format on ingestion, and _before_ 
projecting out of the datasource format on query.
   
   On the other hand, `LATEST(x, y)` is a bit simpler: it doesn't really matter 
where the `y` (timestamp) value comes from: it will be written into the 
intermediate value anyway. To make this concrete:
   
   ```sql
   SELECT
     TIME_PARSE(a) AS __time,
     TIME_PARSE(b) AS anotherTime,
     LATEST(c, anotherTime)
   ```
   
   Would be nice, but Druid doesn't one SELECT expression to reference another, 
so we have to write:
   
   ```sql
   SELECT
     TIME_PARSE(a) AS __time,
     TIME_PARSE(b) AS anotherTime,
     LATEST(c, TIME_PARSE(b))
   ```
   



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java:
##########
@@ -196,6 +197,16 @@ public Aggregation toDruidAggregation(
 
     final String fieldName = getColumnName(plannerContext, 
virtualColumnRegistry, args.get(0), rexNodes.get(0));
 
+    if (!rowSignature.contains(ColumnHolder.TIME_COLUMN_NAME) && 
(aggregatorType == AggregatorType.LATEST || aggregatorType == 
AggregatorType.EARLIEST)) {
+      plannerContext.setPlanningError("%s() aggregator depends on __time 
column, the underlying datasource "
+                                      + "or extern function you are querying 
doesn't contain __time column, "
+                                      + "Please use %s_BY() and specify the 
time column you want to use",
+                                      aggregatorType.name(),
+                                      aggregatorType.name()
+      );
+      return null;
+    }

Review Comment:
   I'm afraid this is the wrong check: we're asking a CSV file to provide a 
`LONG` `__time` column. Probably not going to happen: we can't ask the user to 
rewrite their input files to make MSQ happy.
   
   What we want to know is: is there a `__time` column in the projected output 
row? That is tricky, but it's what we need.



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java:
##########
@@ -1447,12 +1447,24 @@ public void testInnerJoinQueryOfLookup(Map<String, 
Object> queryContext)
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{"", "a", "xabc", "xabc"},
-            new Object[]{"1", "a", "xabc", "xabc"}
+            new Object[]{"", "a", "xa", "xa"},
+            new Object[]{"1", "a", "xa", "xa"}
         )
     );
   }
 
+  @Test(expected = UnsupportedSQLQueryException.class)

Review Comment:
   We want this to work.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java:
##########
@@ -1363,6 +1363,39 @@ public void testGroupByArrayWithMultiValueMvToArray()
         .verifyResults();
   }
 
+  @Test
+  public void testTimeColumnAggregationFromExtern() throws IOException
+  {
+    final File toRead = 
MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, 
"/wikipedia-sampled.json");
+    final String toReadAsJson = 
queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("cnt", ColumnType.LONG)
+                                            .build();
+
+    testSelectQuery()
+        .setSql("WITH\n"
+                + "kttm_data AS (\n"
+                + "SELECT * FROM TABLE(\n"
+                + "  EXTERN(\n"
+                + "    '{ \"files\": [" + toReadAsJson + 
"],\"type\":\"local\"}',\n"
+                + "    '{\"type\":\"json\"}',\n"
+                + "    '[{\"name\": \"timestamp\", \"type\": \"string\"}, 
{\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": 
\"string\"}]'\n"
+                + "  )\n"
+                + "))\n"
+                + "\n"
+                + "SELECT\n"
+                + "  FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+                + "  LATEST(\"page\") AS \"page\"\n"
+                + "FROM kttm_data "
+                + "GROUP BY 1")
+        .setExpectedValidationErrorMatcher(
+            
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("LATEST() 
aggregator depends on __time column"))

Review Comment:
   We need this to succeed, not fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to