This is an automated email from the ASF dual-hosted git repository.
abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3ba9977c60a feat: Add `now()` to troubleshoot pipeline latencies
(#19386)
3ba9977c60a is described below
commit 3ba9977c60afadc6a963b9126d099012b0f86dc3
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed May 13 00:31:22 2026 -0700
feat: Add `now()` to troubleshoot pipeline latencies (#19386)
feat: Added now() expression function that returns the current system
timestamp in milliseconds since epoch. Useful at ingestion time for
troubleshooting pipeline delays (e.g., now() - __time). Note: now() is
non-deterministic as it evaluates for every row, so it can break idempotency.
This can only be added to columns that are not __time.
---
docs/querying/math-expr.md | 25 ++++-
.../apache/druid/math/expr/BuiltInExprMacros.java | 103 +++++++++++++++++++++
.../main/java/org/apache/druid/math/expr/Expr.java | 53 +++++++++--
.../org/apache/druid/math/expr/ExprMacroTable.java | 3 +-
.../druid/segment/filter/ExpressionFilter.java | 3 +
.../segment/transform/ExpressionTransform.java | 11 +++
.../druid/segment/virtual/ExpressionPlan.java | 6 +-
.../druid/segment/virtual/ExpressionPlanner.java | 2 +-
.../druid/segment/virtual/ExpressionSelectors.java | 4 +-
.../java/org/apache/druid/math/expr/ExprTest.java | 17 ++++
.../org/apache/druid/math/expr/FunctionTest.java | 66 +++++++++++++
.../druid/segment/filter/ExpressionFilterTest.java | 31 +++++++
.../druid/segment/transform/TransformerTest.java | 72 ++++++++++++++
.../segment/virtual/ExpressionPlannerTest.java | 20 ++++
.../segment/virtual/ExpressionSelectorsTest.java | 35 +++++++
.../virtual/ExpressionVirtualColumnTest.java | 22 +++++
16 files changed, 457 insertions(+), 16 deletions(-)
diff --git a/docs/querying/math-expr.md b/docs/querying/math-expr.md
index 06ac395c7ad..028af3f01f2 100644
--- a/docs/querying/math-expr.md
+++ b/docs/querying/math-expr.md
@@ -110,8 +110,9 @@ The following built-in functions are available.
|name|description|
|----|-----------|
+|now|now() returns the current system timestamp in milliseconds since epoch
(1970-01-01 00:00:00 UTC). This function is evaluated for each row at
processing time. It's recommended to use this only for troubleshooting issues -
see [Using now() in ingestion](#using-now-in-ingestion).|
|timestamp|timestamp(expr[,format-string]) parses string expr into date then
returns milliseconds from java epoch. without 'format-string' it's regarded as
ISO datetime format |
-|unix_timestamp|same with 'timestamp' function but returns seconds instead |
+|unix_timestamp|unix_timestamp(expr[,format-string]) same with 'timestamp'
function but returns seconds instead |
|timestamp_ceil|timestamp_ceil(expr, period, \[origin, \[timezone\]\]) rounds
up a timestamp, returning it as a new timestamp. Period can be any ISO8601
period, like P3M (quarters) or PT12H (half-days). The time zone, if provided,
should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|
|timestamp_floor|timestamp_floor(expr, period, \[origin, [timezone\]\]) rounds
down a timestamp, returning it as a new timestamp. Period can be any ISO8601
period, like P3M (quarters) or PT12H (half-days). The time zone, if provided,
should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|
|timestamp_shift|timestamp_shift(expr, period, step, \[timezone\]) shifts a
timestamp by a period (step times), returning it as a new timestamp. Period can
be any ISO8601 period. Step may be negative. The time zone, if provided, should
be a time zone name like "America/Los_Angeles" or offset like "-08:00".|
@@ -119,6 +120,28 @@ The following built-in functions are available.
|timestamp_parse|timestamp_parse(string expr, \[pattern, [timezone\]\]) parses
a string into a timestamp using a given [Joda DateTimeFormat
pattern](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat).
If the pattern is not provided, this parses time strings in either ISO8601 or
SQL format. The time zone, if provided, should be a time zone name like
"America/Los_Angeles" or offset like "-08:00", and will be used as the time
zone for strings that do not include a ti [...]
|timestamp_format|timestamp_format(expr, \[pattern, \[timezone\]\]) formats a
timestamp as a string with a given [Joda DateTimeFormat
pattern](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat),
or ISO8601 if the pattern is not provided. The time zone, if provided, should
be a time zone name like "America/Los_Angeles" or offset like "-08:00". Pattern
and time zone must be literals.|
+### Using `now()` in ingestion
+
+:::warning
+`now()` is non-deterministic — replicated streaming tasks and task replays
evaluate it at
+different wall-clock times, producing inconsistent results across replicas.
Use `now()` to
+populate regular columns (e.g. ingestion time, lag metrics), but Druid rejects
mapping
+`__time` to `now()`. For Kafka, prefer
+[`kafka.timestamp`](../ingestion/data-formats.md#kafka) as the `__time` source.
+:::
+
+To troubleshoot end-to-end pipeline delays, store `now()` or `now() - __time`
as a separate
+dimension via a
[`transformSpec`](../ingestion/ingestion-spec.md#transformspec):
+
+```json
+"transformSpec": {
+ "transforms": [
+ { "type": "expression", "name": "druid_ingestion_time", "expression":
"now()" },
+ { "type": "expression", "name": "ingestion_lag_ms", "expression": "now() -
__time" }
+ ]
+}
+```
+
## Math functions
See javadoc of java.lang.Math for detailed explanation for each function.
diff --git
a/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java
b/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java
index 189ae918c3d..25489906415 100644
--- a/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java
+++ b/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java
@@ -21,10 +21,12 @@ package org.apache.druid.math.expr;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.column.TypeStrategy;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
public class BuiltInExprMacros
{
@@ -214,4 +216,105 @@ public class BuiltInExprMacros
}
}
+ /**
+ * Expression macro for now() function that returns current system timestamp.
+ * Implemented as a macro to prevent constant folding optimization.
+ */
+ public static class NowExprMacro implements ExprMacroTable.ExprMacro
+ {
+ public static final String NAME = "now";
+
+ // Strictly monotonic counter. Mixed into the cache key of any Expr
containing now() so each computation
+ // produces a unique key, effectively disabling result caching for
non-deterministic expressions.
+ private static final AtomicLong CACHE_KEY_NONCE = new AtomicLong();
+
+ @Override
+ public String name()
+ {
+ return NAME;
+ }
+
+ @Override
+ public Expr apply(List<Expr> args)
+ {
+ validationHelperCheckArgumentCount(args, 0);
+ return new NowExpression();
+ }
+
+ static final class NowExpression implements Expr
+ {
+ @Override
+ public ExprEval eval(ObjectBinding bindings)
+ {
+ return ExprEval.ofLong(System.currentTimeMillis());
+ }
+
+ @Override
+ public String stringify()
+ {
+ return "now()";
+ }
+
+ @Override
+ public Expr visit(Shuttle shuttle)
+ {
+ return shuttle.visit(this);
+ }
+
+ @Override
+ public BindingAnalysis analyzeInputs()
+ {
+ return new BindingAnalysis().withNonDeterministic();
+ }
+
+ @Override
+ public void decorateCacheKeyBuilder(CacheKeyBuilder builder)
+ {
+ // Append a strictly increasing nonce so any Expr containing now()
produces a fresh cache key on every
+ // computation, effectively disabling result caching.
+ builder.appendLong(CACHE_KEY_NONCE.incrementAndGet());
+ }
+
+ @Nullable
+ @Override
+ public ExpressionType getOutputType(InputBindingInspector inspector)
+ {
+ return ExpressionType.LONG;
+ }
+
+ @Override
+ public boolean isLiteral()
+ {
+ // NOT a literal - prevents constant folding
+ return false;
+ }
+
+ @Override
+ public boolean isNullLiteral()
+ {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public Object getLiteralValue()
+ {
+ // Not a literal, so no constant value
+ return null;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return NowExpression.class.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ return obj instanceof NowExpression;
+ }
+ }
+ }
+
}
diff --git a/processing/src/main/java/org/apache/druid/math/expr/Expr.java
b/processing/src/main/java/org/apache/druid/math/expr/Expr.java
index faa122d0e70..24e992a25cd 100644
--- a/processing/src/main/java/org/apache/druid/math/expr/Expr.java
+++ b/processing/src/main/java/org/apache/druid/math/expr/Expr.java
@@ -321,6 +321,9 @@ public interface Expr extends Cacheable
{
final Expr.BindingAnalysis details = analyzeInputs();
if (details.getRequiredBindings().isEmpty()) {
+ if (details.isNonDeterministic()) {
+ return null;
+ }
// Constant expression.
final ExprEval<?> eval = eval(InputBindings.nilBindings());
if (eval.value() == null) {
@@ -637,15 +640,16 @@ public interface Expr extends Cacheable
private final ImmutableSet<IdentifierExpr> arrayVariables;
private final boolean hasInputArrays;
private final boolean isOutputArray;
+ private final boolean isNonDeterministic;
public BindingAnalysis()
{
- this(ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(), false,
false);
+ this(ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(), false,
false, false);
}
BindingAnalysis(IdentifierExpr expr)
{
- this(ImmutableSet.of(expr), ImmutableSet.of(), ImmutableSet.of(), false,
false);
+ this(ImmutableSet.of(expr), ImmutableSet.of(), ImmutableSet.of(), false,
false, false);
}
private BindingAnalysis(
@@ -653,7 +657,8 @@ public interface Expr extends Cacheable
ImmutableSet<IdentifierExpr> scalarVariables,
ImmutableSet<IdentifierExpr> arrayVariables,
boolean hasInputArrays,
- boolean isOutputArray
+ boolean isOutputArray,
+ boolean isNonDeterministic
)
{
this.freeVariables = freeVariables;
@@ -661,6 +666,7 @@ public interface Expr extends Cacheable
this.arrayVariables = arrayVariables;
this.hasInputArrays = hasInputArrays;
this.isOutputArray = isOutputArray;
+ this.isNonDeterministic = isNonDeterministic;
}
/**
@@ -679,10 +685,12 @@ public interface Expr extends Cacheable
boolean hasInputArrays = false;
boolean isOutputArray = false;
+ boolean isNonDeterministic = false;
for (final BindingAnalysis other : others) {
hasInputArrays = hasInputArrays || other.hasInputArrays;
isOutputArray = isOutputArray || other.isOutputArray;
+ isNonDeterministic = isNonDeterministic || other.isNonDeterministic;
freeVariables.addAll(other.freeVariables);
scalarVariables.addAll(other.scalarVariables);
@@ -694,7 +702,8 @@ public interface Expr extends Cacheable
scalarVariables.build(),
arrayVariables.build(),
hasInputArrays,
- isOutputArray
+ isOutputArray,
+ isNonDeterministic
);
}
}
@@ -789,6 +798,27 @@ public interface Expr extends Cacheable
return isOutputArray;
}
+ /**
+ * Returns true if the expression tree contains non-deterministic
expressions (e.g. now()) whose value may change
+ * between evaluations and must not be folded into a constant.
+ */
+ public boolean isNonDeterministic()
+ {
+ return isNonDeterministic;
+ }
+
+ public BindingAnalysis withNonDeterministic()
+ {
+ return new BindingAnalysis(
+ freeVariables,
+ scalarVariables,
+ arrayVariables,
+ hasInputArrays,
+ isOutputArray,
+ true
+ );
+ }
+
/**
* Add set of arguments as {@link BindingAnalysis#scalarVariables} that
are *directly* {@link IdentifierExpr},
* else they are ignored.
@@ -807,7 +837,8 @@ public interface Expr extends Cacheable
ImmutableSet.copyOf(Sets.union(scalarVariables, moreScalars)),
arrayVariables,
hasInputArrays,
- isOutputArray
+ isOutputArray,
+ isNonDeterministic
);
}
@@ -829,7 +860,8 @@ public interface Expr extends Cacheable
scalarVariables,
ImmutableSet.copyOf(Sets.union(arrayVariables, arrayIdentifiers)),
hasInputArrays || !arrayArguments.isEmpty(),
- isOutputArray
+ isOutputArray,
+ isNonDeterministic
);
}
@@ -843,7 +875,8 @@ public interface Expr extends Cacheable
scalarVariables,
arrayVariables,
hasArrays || !arrayVariables.isEmpty(),
- isOutputArray
+ isOutputArray,
+ isNonDeterministic
);
}
@@ -857,7 +890,8 @@ public interface Expr extends Cacheable
scalarVariables,
arrayVariables,
hasInputArrays,
- isOutputArray
+ isOutputArray,
+ isNonDeterministic
);
}
@@ -872,7 +906,8 @@ public interface Expr extends Cacheable
ImmutableSet.copyOf(scalarVariables.stream().filter(x ->
!lambda.contains(x.getIdentifier())).iterator()),
ImmutableSet.copyOf(arrayVariables.stream().filter(x ->
!lambda.contains(x.getIdentifier())).iterator()),
hasInputArrays,
- isOutputArray
+ isOutputArray,
+ isNonDeterministic
);
}
diff --git
a/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java
b/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java
index 0f8b9e1f07f..f2a7556e802 100644
--- a/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java
+++ b/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java
@@ -51,7 +51,8 @@ public class ExprMacroTable
COMPLEX_DECODE_BASE_64_EXPR_MACRO,
BuiltInExprMacros.ComplexDecodeBase64ExprMacro.ALIAS
),
- new BuiltInExprMacros.StringDecodeBase64UTFExprMacro()
+ new BuiltInExprMacros.StringDecodeBase64UTFExprMacro(),
+ new BuiltInExprMacros.NowExprMacro()
);
private static final ExprMacroTable NIL = new
ExprMacroTable(Collections.emptyList());
diff --git
a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
index 090acd3d1bf..1864e255170 100644
---
a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
+++
b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
@@ -192,6 +192,9 @@ public class ExpressionFilter implements Filter
@Override
public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
{
+ if (bindingDetails.get().isNonDeterministic()) {
+ return null;
+ }
return expr.get().asBitmapColumnIndex(selector);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
b/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
index de3cb7c8697..b99768d6e8d 100644
---
a/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
+++
b/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
@@ -26,11 +26,13 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.Rows;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.InputBindings;
import org.apache.druid.math.expr.Parser;
+import org.apache.druid.segment.column.ColumnHolder;
import java.util.List;
import java.util.Objects;
@@ -57,6 +59,15 @@ public class ExpressionTransform implements Transform
this.parsedExpression = Suppliers.memoize(
() -> Parser.parse(expression,
Preconditions.checkNotNull(this.macroTable, "macroTable"))
)::get;
+
+ if (ColumnHolder.TIME_COLUMN_NAME.equals(name) &&
parsedExpression.get().analyzeInputs().isNonDeterministic()) {
+ throw InvalidInput.exception(
+ "Cannot use non-deterministic expression[%s] to set column name[%s]."
+ + " Non-deterministic expressions such as now() are not supported as
__time transforms.",
+ expression,
+ ColumnHolder.TIME_COLUMN_NAME
+ );
+ }
}
@JsonProperty
diff --git
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
index 71af403fa76..7039ed5a7e1 100644
---
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
+++
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
@@ -121,11 +121,13 @@ public class ExpressionPlan
}
/**
- * An expression with no inputs is a constant
+ * Returns true if this expression is a compile-time constant: it has no
input bindings and contains no
+ * non-deterministic sub-expressions (e.g. {@code now()}). Non-deterministic
expressions are excluded because they
+ * must be re-evaluated per query/row rather than folded to a single value
at selector/index construction time.
*/
public boolean isConstant()
{
- return analysis.getRequiredBindings().isEmpty();
+ return analysis.getRequiredBindings().isEmpty() &&
!analysis.isNonDeterministic();
}
/**
diff --git
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
index af4c1ce7759..59ba4efcaf7 100644
---
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
+++
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
@@ -65,7 +65,7 @@ public class ExpressionPlanner
final Set<String> columns = analysis.getRequiredBindings();
// check and set traits which allow optimized selectors to be created
- if (columns.isEmpty()) {
+ if (columns.isEmpty() && !analysis.isNonDeterministic()) {
traits.add(ExpressionPlan.Trait.CONSTANT);
} else if (expression.isIdentifier()) {
traits.add(ExpressionPlan.Trait.IDENTIFIER);
diff --git
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
index d037a24bbd0..dfb97cf8b5f 100644
---
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
+++
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
@@ -163,8 +163,8 @@ public class ExpressionSelectors
}
final Expr.ObjectBinding bindings = createBindings(columnSelectorFactory,
plan);
- // Optimization for constant expressions
- if (bindings.equals(InputBindings.nilBindings())) {
+ // Optimization for constant expressions (but not non-deterministic ones
like now())
+ if (bindings.equals(InputBindings.nilBindings()) &&
!plan.getAnalysis().isNonDeterministic()) {
return new ConstantExprEvalSelector(plan.getExpression().eval(bindings));
}
diff --git a/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java
b/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java
index d5648ae428d..0315bd47929 100644
--- a/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java
+++ b/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -221,6 +222,22 @@ public class ExprTest
.verify();
}
+ @Test
+ public void testNowExprCacheKeyIsNotStable()
+ {
+ Expr now = Parser.parse("now()", ExprMacroTable.nil());
+ byte[] k1 = now.getCacheKey();
+ byte[] k2 = now.getCacheKey();
+ Assertions.assertFalse(Arrays.equals(k1, k2), "bare now() cache key must
change across calls");
+
+ // Verify the same instability propagates when now() is nested in a parent
expression.
+ Expr nested = Parser.parse("now() > 0", ExprMacroTable.nil());
+ Assertions.assertFalse(
+ Arrays.equals(nested.getCacheKey(), nested.getCacheKey()),
+ "Expr containing now() must produce an unstable cache key"
+ );
+ }
+
@Test
public void testShuttleVisitAll()
{
diff --git
a/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java
b/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java
index 51068feefd4..66f0bc0907e 100644
--- a/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java
+++ b/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java
@@ -1558,4 +1558,70 @@ public class FunctionTest extends
InitializedNullHandlingTest
Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
Assert.assertArrayEquals(expr.getCacheKey(),
roundTripFlatten.getCacheKey());
}
+
+ @Test
+ public void testNow()
+ {
+ long beforeCall = System.currentTimeMillis();
+
+ Expr expr = Parser.parse("now()", ExprMacroTable.nil());
+ ExprEval result = expr.eval(InputBindings.nilBindings());
+
+ long afterCall = System.currentTimeMillis();
+
+ Assert.assertNotNull(result.value());
+ Assert.assertEquals(ExpressionType.LONG, result.type());
+
+ long timestamp = result.asLong();
+ Assert.assertTrue(
+ "Timestamp should be between before and after: " + beforeCall + " <= "
+ timestamp + " <= " + afterCall,
+ timestamp >= beforeCall && timestamp <= afterCall
+ );
+ }
+
+ @Test
+ public void testNowEvaluatedPerRow()
+ {
+ Expr expr = Parser.parse("now()", ExprMacroTable.nil());
+
+ // now() must not be treated as a literal, otherwise it would be
constant-folded
+ // and not re-evaluated per row.
+ Assert.assertFalse(expr.isLiteral());
+ Assert.assertNull(expr.getLiteralValue());
+
+ long time1 = expr.eval(InputBindings.nilBindings()).asLong();
+ long time2 = expr.eval(InputBindings.nilBindings()).asLong();
+ Assert.assertTrue(
+ "Second call should return same or later timestamp: " + time1 + " <= "
+ time2,
+ time2 >= time1
+ );
+ }
+
+ @Test
+ public void testNowRejectsArguments()
+ {
+ Throwable t = Assert.assertThrows(
+ ExpressionValidationException.class,
+ () -> Parser.parse("now(123)",
ExprMacroTable.nil()).eval(InputBindings.nilBindings())
+ );
+ Assert.assertEquals("Function[now] does not accept arguments",
t.getMessage());
+ }
+
+ @Test
+ public void testNowInExpression()
+ {
+ // Test using now() in a more complex expression
+ Expr expr = Parser.parse("now() - 1000", ExprMacroTable.nil());
+
+ long beforeCall = System.currentTimeMillis();
+ ExprEval result = expr.eval(InputBindings.nilBindings());
+ long afterCall = System.currentTimeMillis();
+
+ long computed = result.asLong();
+ Assert.assertTrue(
+ "Result should be approximately 1 second before now: computed=" +
computed
+ + ", expected range=[" + (beforeCall - 1000) + ", " + (afterCall -
1000) + "]",
+ computed >= (beforeCall - 1100) && computed <= (afterCall - 900)
+ );
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
index 65c0f6f44d2..e16495d4b4a 100644
---
a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
@@ -364,6 +364,37 @@ public class ExpressionFilterTest extends BaseFilterTest
Assert.assertEquals("Required column rewrite is not supported by this
filter.", t.getMessage());
}
+ /**
+ * now() has no required column bindings, so without the non-determinism
check Expr.asBitmapColumnIndex would fall
+ * into the "constant expression" branch and evaluate it once to produce an
AllTrue/AllFalse bitmap index. The
+ * correct behavior is to return null, which forces per-row evaluation.
+ */
+ @Test
+ public void testNowFilterDoesNotProduceConstantBitmapIndex()
+ {
+ // "now() > 0" has no required bindings — would otherwise trigger constant
folding in asBitmapColumnIndex.
+ Filter filter = edf("now() > 0").toFilter();
+ Assert.assertNull(
+ "ExpressionFilter.getBitmapColumnIndex should return null for
non-deterministic now()",
+ filter.getBitmapColumnIndex(null)
+ );
+ }
+
+ /**
+ * Two ExpressionDimFilters built at different times that both wrap now()
would otherwise produce identical cache
+ * keys (now() stringifies the same way every time), allowing stale cached
results to be reused.
+ * NowExpression mixes nanoTime into the cache key via
decorateCacheKeyBuilder so two distinct filter instances
+ * over the same expression produce different bytes.
+ */
+ @Test
+ public void testNowDimFilterCacheKeyIsNotStableAcrossInstances()
+ {
+ Assert.assertFalse(
+ "ExpressionDimFilter cache keys for now() must differ across instances
to defeat result caching",
+ java.util.Arrays.equals(edf("now()").getCacheKey(),
edf("now()").getCacheKey())
+ );
+ }
+
protected static ExpressionDimFilter edf(final String expression)
{
return new ExpressionDimFilter(expression, null,
TestExprMacroTable.INSTANCE);
diff --git
a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
index 2583b81deab..099c9184514 100644
---
a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.data.input.Row;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.expression.TestExprMacroTable;
@@ -507,4 +508,75 @@ public class TransformerTest extends
InitializedNullHandlingTest
Assert.assertArrayEquals(dimList.subList(0, 5).toArray(), (Object[])
actualTranformedRow.getRaw("dim"));
Assert.assertEquals(ImmutableList.of("a"),
actualTranformedRow.getDimension("dim1"));
}
+
+ @Test
+ public void testNowTransform()
+ {
+ TransformSpec transformSpec = new TransformSpec(
+ null,
+ ImmutableList.of(
+ new ExpressionTransform("ingestion_time", "now()",
TestExprMacroTable.INSTANCE),
+ new ExpressionTransform("lag_ms", "now() - __time",
TestExprMacroTable.INSTANCE)
+ )
+ );
+
+ long beforeTransform = System.currentTimeMillis();
+
+ InputRow row = new MapBasedInputRow(
+ DateTimes.of("2024-01-01T00:00:00Z"),
+ ImmutableList.of("dim"),
+ ImmutableMap.of("dim", "value")
+ );
+
+ Transformer transformer = transformSpec.toTransformer();
+ InputRow transformed = transformer.transform(row);
+
+ Assert.assertNotNull(transformed);
+ Assert.assertNotNull(transformed.getRaw("ingestion_time"));
+
+ long ingestionTime = ((Number)
transformed.getRaw("ingestion_time")).longValue();
+ long afterTransform = System.currentTimeMillis();
+ Assert.assertTrue(
+ "Ingestion time should be between transform start and end: "
+ + beforeTransform + " <= " + ingestionTime + " <= " +
afterTransform,
+ ingestionTime >= beforeTransform && ingestionTime <= afterTransform
+ );
+
+ // Verify lag calculation (may be slightly different from ingestionTime -
__time due to timing)
+ long lag = ((Number) transformed.getRaw("lag_ms")).longValue();
+ long eventTime = DateTimes.of("2024-01-01T00:00:00Z").getMillis();
+ long expectedLag = ingestionTime - eventTime;
+
+ // Allow small difference since now() is called twice (once for
ingestion_time, once for lag_ms)
+ long lagDiff = Math.abs(lag - expectedLag);
+ Assert.assertTrue(
+ "Lag should be approximately correct (diff=" + lagDiff + "ms):
expected=" + expectedLag + ", actual=" + lag,
+ lagDiff < 100 // Allow up to 100ms difference
+ );
+
+ // Verify lag is positive (ingestion happened after event)
+ Assert.assertTrue("Lag should be positive", lag > 0);
+ }
+
+ @Test
+ public void testNowCannotBeUsedForTimeColumn()
+ {
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "Cannot use non-deterministic expression[now()] to set column
name[__time]."
+ + " Non-deterministic expressions such as now() are not supported as
__time transforms."
+ ).assertThrowsAndMatches(
+ () -> new ExpressionTransform("__time", "now()",
TestExprMacroTable.INSTANCE)
+ );
+ }
+
+ @Test
+ public void testNowRejectedWhenWrappedInArithmeticForTimeColumn()
+ {
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "Cannot use non-deterministic expression[now() + 1000] to set column
name[__time]."
+ + " Non-deterministic expressions such as now() are not supported as
__time transforms."
+ ).assertThrowsAndMatches(
+ () -> new ExpressionTransform("__time", "now() + 1000",
TestExprMacroTable.INSTANCE)
+ );
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
index ec1db1ae6f8..06154fc95bd 100644
---
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
@@ -1371,6 +1371,26 @@ public class ExpressionPlannerTest extends
InitializedNullHandlingTest
);
}
+ @Test
+ public void testNowIsNotConstant()
+ {
+ ExpressionPlan thePlan = plan("now()");
+ Assert.assertFalse(
+ thePlan.is(ExpressionPlan.Trait.CONSTANT)
+ );
+ Assert.assertFalse(thePlan.isConstant());
+ }
+
+ @Test
+ public void testNowWrappedInExpressionIsNotConstant()
+ {
+ ExpressionPlan thePlan = plan("now() + 1000");
+ Assert.assertFalse(
+ thePlan.is(ExpressionPlan.Trait.CONSTANT)
+ );
+ Assert.assertFalse(thePlan.isConstant());
+ }
+
private static ExpressionPlan plan(String expression)
{
return ExpressionPlanner.plan(SYNTHETIC_INSPECTOR,
Parser.parse(expression, MACRO_TABLE));
diff --git
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
index 99b6095f00f..b150cd8baa2 100644
---
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
@@ -51,6 +51,7 @@ import
org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseSingleValueDimensionSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.ConstantExprEvalSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
@@ -1134,6 +1135,40 @@ public class ExpressionSelectorsTest extends
InitializedNullHandlingTest
};
}
+ @Test
+ public void test_now_selector_is_not_constant() throws InterruptedException
+ {
+ final Expr nowExpr = Parser.parse("now()", TestExprMacroTable.INSTANCE);
+
+ for (CursorFactory cursorFactory : CURSOR_FACTORIES) {
+ try (final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+ Cursor cursor = cursorHolder.asCursor();
+ ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+ ColumnValueSelector<ExprEval> selector =
ExpressionSelectors.makeExprEvalSelector(factory, nowExpr);
+ Assert.assertFalse(
+ "now() must not be folded into a ConstantExprEvalSelector because
its value changes over time",
+ selector instanceof ConstantExprEvalSelector
+ );
+
+ final long first = selector.getLong();
+ Thread.sleep(1);
+ cursor.advance();
+ final long second = selector.getLong();
+
+ Assert.assertTrue(
+ "now() must be monotonic across rows; got first=" + first + "
second=" + second,
+ second >= first
+ );
+ Assert.assertNotEquals(
+ "now() must re-evaluate after the cursor advances; a
ConstantExprEvalSelector would return the same value",
+ first,
+ second
+ );
+ }
+ }
+ }
+
private static <T> ColumnValueSelector<T> objectSelectorFromSupplier(
final Supplier<T> supplier,
final Class<T> clazz
diff --git
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
index 676afd3531f..1b2d8ac34b7 100644
---
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
@@ -792,4 +792,26 @@ public class ExpressionVirtualColumnTest extends
InitializedNullHandlingTest
Assert.assertTrue(multiConstantSelector instanceof
ConstantMultiValueDimensionSelector);
Assert.assertEquals(ImmutableList.of("a", "b", "c"),
multiConstantSelector.getObject());
}
+
+ @Test
+ public void testNowCacheKeyIsNotStableAcrossInstances()
+ {
+ ExpressionVirtualColumn vc1 = new ExpressionVirtualColumn(
+ "v0",
+ "now()",
+ ColumnType.LONG,
+ TestExprMacroTable.INSTANCE
+ );
+ ExpressionVirtualColumn vc2 = new ExpressionVirtualColumn(
+ "v0",
+ "now()",
+ ColumnType.LONG,
+ TestExprMacroTable.INSTANCE
+ );
+
+ Assert.assertFalse(
+ "ExpressionVirtualColumn cache keys for now() must differ across
instances to defeat result caching",
+ Arrays.equals(vc1.getCacheKey(), vc2.getCacheKey())
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]