This is an automated email from the ASF dual-hosted git repository.

abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ba9977c60a feat: Add `now()` to troubleshoot pipeline latencies 
(#19386)
3ba9977c60a is described below

commit 3ba9977c60afadc6a963b9126d099012b0f86dc3
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed May 13 00:31:22 2026 -0700

    feat: Add `now()` to troubleshoot pipeline latencies (#19386)
    
    feat: Added now() expression function that returns the current system 
timestamp in milliseconds since epoch. Useful at ingestion time for 
troubleshooting pipeline delays (e.g., now() - __time). Note: now() is 
non-deterministic as it evaluates for every row, so it can break idempotency. 
This can only be added to columns that are not __time.
---
 docs/querying/math-expr.md                         |  25 ++++-
 .../apache/druid/math/expr/BuiltInExprMacros.java  | 103 +++++++++++++++++++++
 .../main/java/org/apache/druid/math/expr/Expr.java |  53 +++++++++--
 .../org/apache/druid/math/expr/ExprMacroTable.java |   3 +-
 .../druid/segment/filter/ExpressionFilter.java     |   3 +
 .../segment/transform/ExpressionTransform.java     |  11 +++
 .../druid/segment/virtual/ExpressionPlan.java      |   6 +-
 .../druid/segment/virtual/ExpressionPlanner.java   |   2 +-
 .../druid/segment/virtual/ExpressionSelectors.java |   4 +-
 .../java/org/apache/druid/math/expr/ExprTest.java  |  17 ++++
 .../org/apache/druid/math/expr/FunctionTest.java   |  66 +++++++++++++
 .../druid/segment/filter/ExpressionFilterTest.java |  31 +++++++
 .../druid/segment/transform/TransformerTest.java   |  72 ++++++++++++++
 .../segment/virtual/ExpressionPlannerTest.java     |  20 ++++
 .../segment/virtual/ExpressionSelectorsTest.java   |  35 +++++++
 .../virtual/ExpressionVirtualColumnTest.java       |  22 +++++
 16 files changed, 457 insertions(+), 16 deletions(-)

diff --git a/docs/querying/math-expr.md b/docs/querying/math-expr.md
index 06ac395c7ad..028af3f01f2 100644
--- a/docs/querying/math-expr.md
+++ b/docs/querying/math-expr.md
@@ -110,8 +110,9 @@ The following built-in functions are available.
 
 |name|description|
 |----|-----------|
+|now|now() returns the current system timestamp in milliseconds since epoch 
(1970-01-01 00:00:00 UTC). This function is evaluated for each row at 
processing time. It's recommended to use this only for troubleshooting issues - 
see [Using now() in ingestion](#using-now-in-ingestion).|
 |timestamp|timestamp(expr[,format-string]) parses string expr into date then 
returns milliseconds from java epoch. without 'format-string' it's regarded as 
ISO datetime format |
-|unix_timestamp|same with 'timestamp' function but returns seconds instead |
+|unix_timestamp|unix_timestamp(expr[,format-string]) same with 'timestamp' 
function but returns seconds instead |
 |timestamp_ceil|timestamp_ceil(expr, period, \[origin, \[timezone\]\]) rounds 
up a timestamp, returning it as a new timestamp. Period can be any ISO8601 
period, like P3M (quarters) or PT12H (half-days). The time zone, if provided, 
should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|
 |timestamp_floor|timestamp_floor(expr, period, \[origin, [timezone\]\]) rounds 
down a timestamp, returning it as a new timestamp. Period can be any ISO8601 
period, like P3M (quarters) or PT12H (half-days). The time zone, if provided, 
should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|
 |timestamp_shift|timestamp_shift(expr, period, step, \[timezone\]) shifts a 
timestamp by a period (step times), returning it as a new timestamp. Period can 
be any ISO8601 period. Step may be negative. The time zone, if provided, should 
be a time zone name like "America/Los_Angeles" or offset like "-08:00".|
@@ -119,6 +120,28 @@ The following built-in functions are available.
 |timestamp_parse|timestamp_parse(string expr, \[pattern, [timezone\]\]) parses 
a string into a timestamp using a given [Joda DateTimeFormat 
pattern](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat).
 If the pattern is not provided, this parses time strings in either ISO8601 or 
SQL format. The time zone, if provided, should be a time zone name like 
"America/Los_Angeles" or offset like "-08:00", and will be used as the time 
zone for strings that do not include a ti [...]
 |timestamp_format|timestamp_format(expr, \[pattern, \[timezone\]\]) formats a 
timestamp as a string with a given [Joda DateTimeFormat 
pattern](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat),
 or ISO8601 if the pattern is not provided. The time zone, if provided, should 
be a time zone name like "America/Los_Angeles" or offset like "-08:00". Pattern 
and time zone must be literals.|
 
+### Using `now()` in ingestion
+
+:::warning
+`now()` is non-deterministic — replicated streaming tasks and task replays 
evaluate it at
+different wall-clock times, producing inconsistent results across replicas. 
Use `now()` to
+populate regular columns (e.g. ingestion time, lag metrics), but Druid rejects 
mapping
+`__time` to `now()`. For Kafka, prefer
+[`kafka.timestamp`](../ingestion/data-formats.md#kafka) as the `__time` source.
+:::
+
+To troubleshoot end-to-end pipeline delays, store `now()` or `now() - __time` 
as a separate
+dimension via a 
[`transformSpec`](../ingestion/ingestion-spec.md#transformspec):
+
+```json
+"transformSpec": {
+  "transforms": [
+    { "type": "expression", "name": "druid_ingestion_time", "expression": 
"now()" },
+    { "type": "expression", "name": "ingestion_lag_ms", "expression": "now() - 
__time" }
+  ]
+}
+```
+
 ## Math functions
 
 See javadoc of java.lang.Math for detailed explanation for each function.
diff --git 
a/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java 
b/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java
index 189ae918c3d..25489906415 100644
--- a/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java
+++ b/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java
@@ -21,10 +21,12 @@ package org.apache.druid.math.expr;
 
 import com.google.common.collect.Iterables;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.segment.column.TypeStrategy;
 
 import javax.annotation.Nullable;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class BuiltInExprMacros
 {
@@ -214,4 +216,105 @@ public class BuiltInExprMacros
     }
   }
 
+  /**
+   * Expression macro for now() function that returns current system timestamp.
+   * Implemented as a macro to prevent constant folding optimization.
+   */
+  public static class NowExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "now";
+
+    // Strictly monotonic counter. Mixed into the cache key of any Expr 
containing now() so each computation
+    // produces a unique key, effectively disabling result caching for 
non-deterministic expressions.
+    private static final AtomicLong CACHE_KEY_NONCE = new AtomicLong();
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      validationHelperCheckArgumentCount(args, 0);
+      return new NowExpression();
+    }
+
+    static final class NowExpression implements Expr
+    {
+      @Override
+      public ExprEval eval(ObjectBinding bindings)
+      {
+        return ExprEval.ofLong(System.currentTimeMillis());
+      }
+
+      @Override
+      public String stringify()
+      {
+        return "now()";
+      }
+
+      @Override
+      public Expr visit(Shuttle shuttle)
+      {
+        return shuttle.visit(this);
+      }
+
+      @Override
+      public BindingAnalysis analyzeInputs()
+      {
+        return new BindingAnalysis().withNonDeterministic();
+      }
+
+      @Override
+      public void decorateCacheKeyBuilder(CacheKeyBuilder builder)
+      {
+        // Append a strictly increasing nonce so any Expr containing now() 
produces a fresh cache key on every
+        // computation, effectively disabling result caching.
+        builder.appendLong(CACHE_KEY_NONCE.incrementAndGet());
+      }
+
+      @Nullable
+      @Override
+      public ExpressionType getOutputType(InputBindingInspector inspector)
+      {
+        return ExpressionType.LONG;
+      }
+
+      @Override
+      public boolean isLiteral()
+      {
+        // NOT a literal - prevents constant folding
+        return false;
+      }
+
+      @Override
+      public boolean isNullLiteral()
+      {
+        return false;
+      }
+
+      @Nullable
+      @Override
+      public Object getLiteralValue()
+      {
+        // Not a literal, so no constant value
+        return null;
+      }
+
+      @Override
+      public int hashCode()
+      {
+        return NowExpression.class.hashCode();
+      }
+
+      @Override
+      public boolean equals(Object obj)
+      {
+        return obj instanceof NowExpression;
+      }
+    }
+  }
+
 }
diff --git a/processing/src/main/java/org/apache/druid/math/expr/Expr.java 
b/processing/src/main/java/org/apache/druid/math/expr/Expr.java
index faa122d0e70..24e992a25cd 100644
--- a/processing/src/main/java/org/apache/druid/math/expr/Expr.java
+++ b/processing/src/main/java/org/apache/druid/math/expr/Expr.java
@@ -321,6 +321,9 @@ public interface Expr extends Cacheable
   {
     final Expr.BindingAnalysis details = analyzeInputs();
     if (details.getRequiredBindings().isEmpty()) {
+      if (details.isNonDeterministic()) {
+        return null;
+      }
       // Constant expression.
       final ExprEval<?> eval = eval(InputBindings.nilBindings());
       if (eval.value() == null) {
@@ -637,15 +640,16 @@ public interface Expr extends Cacheable
     private final ImmutableSet<IdentifierExpr> arrayVariables;
     private final boolean hasInputArrays;
     private final boolean isOutputArray;
+    private final boolean isNonDeterministic;
 
     public BindingAnalysis()
     {
-      this(ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(), false, 
false);
+      this(ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(), false, 
false, false);
     }
 
     BindingAnalysis(IdentifierExpr expr)
     {
-      this(ImmutableSet.of(expr), ImmutableSet.of(), ImmutableSet.of(), false, 
false);
+      this(ImmutableSet.of(expr), ImmutableSet.of(), ImmutableSet.of(), false, 
false, false);
     }
 
     private BindingAnalysis(
@@ -653,7 +657,8 @@ public interface Expr extends Cacheable
         ImmutableSet<IdentifierExpr> scalarVariables,
         ImmutableSet<IdentifierExpr> arrayVariables,
         boolean hasInputArrays,
-        boolean isOutputArray
+        boolean isOutputArray,
+        boolean isNonDeterministic
     )
     {
       this.freeVariables = freeVariables;
@@ -661,6 +666,7 @@ public interface Expr extends Cacheable
       this.arrayVariables = arrayVariables;
       this.hasInputArrays = hasInputArrays;
       this.isOutputArray = isOutputArray;
+      this.isNonDeterministic = isNonDeterministic;
     }
 
     /**
@@ -679,10 +685,12 @@ public interface Expr extends Cacheable
 
         boolean hasInputArrays = false;
         boolean isOutputArray = false;
+        boolean isNonDeterministic = false;
 
         for (final BindingAnalysis other : others) {
           hasInputArrays = hasInputArrays || other.hasInputArrays;
           isOutputArray = isOutputArray || other.isOutputArray;
+          isNonDeterministic = isNonDeterministic || other.isNonDeterministic;
 
           freeVariables.addAll(other.freeVariables);
           scalarVariables.addAll(other.scalarVariables);
@@ -694,7 +702,8 @@ public interface Expr extends Cacheable
             scalarVariables.build(),
             arrayVariables.build(),
             hasInputArrays,
-            isOutputArray
+            isOutputArray,
+            isNonDeterministic
         );
       }
     }
@@ -789,6 +798,27 @@ public interface Expr extends Cacheable
       return isOutputArray;
     }
 
+    /**
+     * Returns true if the expression tree contains non-deterministic 
expressions (e.g. now()) whose value may change
+     * between evaluations and must not be folded into a constant.
+     */
+    public boolean isNonDeterministic()
+    {
+      return isNonDeterministic;
+    }
+
+    public BindingAnalysis withNonDeterministic()
+    {
+      return new BindingAnalysis(
+          freeVariables,
+          scalarVariables,
+          arrayVariables,
+          hasInputArrays,
+          isOutputArray,
+          true
+      );
+    }
+
     /**
      * Add set of arguments as {@link BindingAnalysis#scalarVariables} that 
are *directly* {@link IdentifierExpr},
      * else they are ignored.
@@ -807,7 +837,8 @@ public interface Expr extends Cacheable
           ImmutableSet.copyOf(Sets.union(scalarVariables, moreScalars)),
           arrayVariables,
           hasInputArrays,
-          isOutputArray
+          isOutputArray,
+          isNonDeterministic
       );
     }
 
@@ -829,7 +860,8 @@ public interface Expr extends Cacheable
           scalarVariables,
           ImmutableSet.copyOf(Sets.union(arrayVariables, arrayIdentifiers)),
           hasInputArrays || !arrayArguments.isEmpty(),
-          isOutputArray
+          isOutputArray,
+          isNonDeterministic
       );
     }
 
@@ -843,7 +875,8 @@ public interface Expr extends Cacheable
           scalarVariables,
           arrayVariables,
           hasArrays || !arrayVariables.isEmpty(),
-          isOutputArray
+          isOutputArray,
+          isNonDeterministic
       );
     }
 
@@ -857,7 +890,8 @@ public interface Expr extends Cacheable
           scalarVariables,
           arrayVariables,
           hasInputArrays,
-          isOutputArray
+          isOutputArray,
+          isNonDeterministic
       );
     }
 
@@ -872,7 +906,8 @@ public interface Expr extends Cacheable
           ImmutableSet.copyOf(scalarVariables.stream().filter(x -> 
!lambda.contains(x.getIdentifier())).iterator()),
           ImmutableSet.copyOf(arrayVariables.stream().filter(x -> 
!lambda.contains(x.getIdentifier())).iterator()),
           hasInputArrays,
-          isOutputArray
+          isOutputArray,
+          isNonDeterministic
       );
     }
 
diff --git 
a/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java 
b/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java
index 0f8b9e1f07f..f2a7556e802 100644
--- a/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java
+++ b/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java
@@ -51,7 +51,8 @@ public class ExprMacroTable
           COMPLEX_DECODE_BASE_64_EXPR_MACRO,
           BuiltInExprMacros.ComplexDecodeBase64ExprMacro.ALIAS
       ),
-      new BuiltInExprMacros.StringDecodeBase64UTFExprMacro()
+      new BuiltInExprMacros.StringDecodeBase64UTFExprMacro(),
+      new BuiltInExprMacros.NowExprMacro()
   );
   private static final ExprMacroTable NIL = new 
ExprMacroTable(Collections.emptyList());
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
 
b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
index 090acd3d1bf..1864e255170 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
@@ -192,6 +192,9 @@ public class ExpressionFilter implements Filter
   @Override
   public BitmapColumnIndex getBitmapColumnIndex(ColumnIndexSelector selector)
   {
+    if (bindingDetails.get().isNonDeterministic()) {
+      return null;
+    }
     return expr.get().asBitmapColumnIndex(selector);
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
 
b/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
index de3cb7c8697..b99768d6e8d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/transform/ExpressionTransform.java
@@ -26,11 +26,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.Rows;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.math.expr.InputBindings;
 import org.apache.druid.math.expr.Parser;
+import org.apache.druid.segment.column.ColumnHolder;
 
 import java.util.List;
 import java.util.Objects;
@@ -57,6 +59,15 @@ public class ExpressionTransform implements Transform
     this.parsedExpression = Suppliers.memoize(
         () -> Parser.parse(expression, 
Preconditions.checkNotNull(this.macroTable, "macroTable"))
     )::get;
+
+    if (ColumnHolder.TIME_COLUMN_NAME.equals(name) && 
parsedExpression.get().analyzeInputs().isNonDeterministic()) {
+      throw InvalidInput.exception(
+          "Cannot use non-deterministic expression[%s] to set column name[%s]."
+          + " Non-deterministic expressions such as now() are not supported as 
__time transforms.",
+          expression,
+          ColumnHolder.TIME_COLUMN_NAME
+      );
+    }
   }
 
   @JsonProperty
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
index 71af403fa76..7039ed5a7e1 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
@@ -121,11 +121,13 @@ public class ExpressionPlan
   }
 
   /**
-   * An expression with no inputs is a constant
+   * Returns true if this expression is a compile-time constant: it has no 
input bindings and contains no
+   * non-deterministic sub-expressions (e.g. {@code now()}). Non-deterministic 
expressions are excluded because they
+   * must be re-evaluated per query/row rather than folded to a single value 
at selector/index construction time.
    */
   public boolean isConstant()
   {
-    return analysis.getRequiredBindings().isEmpty();
+    return analysis.getRequiredBindings().isEmpty() && 
!analysis.isNonDeterministic();
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
index af4c1ce7759..59ba4efcaf7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
@@ -65,7 +65,7 @@ public class ExpressionPlanner
     final Set<String> columns = analysis.getRequiredBindings();
 
     // check and set traits which allow optimized selectors to be created
-    if (columns.isEmpty()) {
+    if (columns.isEmpty() && !analysis.isNonDeterministic()) {
       traits.add(ExpressionPlan.Trait.CONSTANT);
     } else if (expression.isIdentifier()) {
       traits.add(ExpressionPlan.Trait.IDENTIFIER);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
index d037a24bbd0..dfb97cf8b5f 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
@@ -163,8 +163,8 @@ public class ExpressionSelectors
     }
     final Expr.ObjectBinding bindings = createBindings(columnSelectorFactory, 
plan);
 
-    // Optimization for constant expressions
-    if (bindings.equals(InputBindings.nilBindings())) {
+    // Optimization for constant expressions (but not non-deterministic ones 
like now())
+    if (bindings.equals(InputBindings.nilBindings()) && 
!plan.getAnalysis().isNonDeterministic()) {
       return new ConstantExprEvalSelector(plan.getExpression().eval(bindings));
     }
 
diff --git a/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java 
b/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java
index d5648ae428d..0315bd47929 100644
--- a/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java
+++ b/processing/src/test/java/org/apache/druid/math/expr/ExprTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -221,6 +222,22 @@ public class ExprTest
                   .verify();
   }
 
+  @Test
+  public void testNowExprCacheKeyIsNotStable()
+  {
+    Expr now = Parser.parse("now()", ExprMacroTable.nil());
+    byte[] k1 = now.getCacheKey();
+    byte[] k2 = now.getCacheKey();
+    Assertions.assertFalse(Arrays.equals(k1, k2), "bare now() cache key must 
change across calls");
+
+    // Verify the same instability propagates when now() is nested in a parent 
expression.
+    Expr nested = Parser.parse("now() > 0", ExprMacroTable.nil());
+    Assertions.assertFalse(
+        Arrays.equals(nested.getCacheKey(), nested.getCacheKey()),
+        "Expr containing now() must produce an unstable cache key"
+    );
+  }
+
   @Test
   public void testShuttleVisitAll()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java 
b/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java
index 51068feefd4..66f0bc0907e 100644
--- a/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java
+++ b/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java
@@ -1558,4 +1558,70 @@ public class FunctionTest extends 
InitializedNullHandlingTest
     Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
     Assert.assertArrayEquals(expr.getCacheKey(), 
roundTripFlatten.getCacheKey());
   }
+
+  @Test
+  public void testNow()
+  {
+    long beforeCall = System.currentTimeMillis();
+
+    Expr expr = Parser.parse("now()", ExprMacroTable.nil());
+    ExprEval result = expr.eval(InputBindings.nilBindings());
+
+    long afterCall = System.currentTimeMillis();
+
+    Assert.assertNotNull(result.value());
+    Assert.assertEquals(ExpressionType.LONG, result.type());
+
+    long timestamp = result.asLong();
+    Assert.assertTrue(
+        "Timestamp should be between before and after: " + beforeCall + " <= " 
+ timestamp + " <= " + afterCall,
+        timestamp >= beforeCall && timestamp <= afterCall
+    );
+  }
+
+  @Test
+  public void testNowEvaluatedPerRow()
+  {
+    Expr expr = Parser.parse("now()", ExprMacroTable.nil());
+
+    // now() must not be treated as a literal, otherwise it would be 
constant-folded
+    // and not re-evaluated per row.
+    Assert.assertFalse(expr.isLiteral());
+    Assert.assertNull(expr.getLiteralValue());
+
+    long time1 = expr.eval(InputBindings.nilBindings()).asLong();
+    long time2 = expr.eval(InputBindings.nilBindings()).asLong();
+    Assert.assertTrue(
+        "Second call should return same or later timestamp: " + time1 + " <= " 
+ time2,
+        time2 >= time1
+    );
+  }
+
+  @Test
+  public void testNowRejectsArguments()
+  {
+    Throwable t = Assert.assertThrows(
+        ExpressionValidationException.class,
+        () -> Parser.parse("now(123)", 
ExprMacroTable.nil()).eval(InputBindings.nilBindings())
+    );
+    Assert.assertEquals("Function[now] does not accept arguments", 
t.getMessage());
+  }
+
+  @Test
+  public void testNowInExpression()
+  {
+    // Test using now() in a more complex expression
+    Expr expr = Parser.parse("now() - 1000", ExprMacroTable.nil());
+
+    long beforeCall = System.currentTimeMillis();
+    ExprEval result = expr.eval(InputBindings.nilBindings());
+    long afterCall = System.currentTimeMillis();
+
+    long computed = result.asLong();
+    Assert.assertTrue(
+        "Result should be approximately 1 second before now: computed=" + 
computed
+            + ", expected range=[" + (beforeCall - 1000) + ", " + (afterCall - 
1000) + "]",
+        computed >= (beforeCall - 1100) && computed <= (afterCall - 900)
+    );
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
index 65c0f6f44d2..e16495d4b4a 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
@@ -364,6 +364,37 @@ public class ExpressionFilterTest extends BaseFilterTest
     Assert.assertEquals("Required column rewrite is not supported by this 
filter.", t.getMessage());
   }
 
+  /**
+   * now() has no required column bindings, so without the non-determinism 
check Expr.asBitmapColumnIndex would fall
+   * into the "constant expression" branch and evaluate it once to produce an 
AllTrue/AllFalse bitmap index. The
+   * correct behavior is to return null, which forces per-row evaluation.
+   */
+  @Test
+  public void testNowFilterDoesNotProduceConstantBitmapIndex()
+  {
+    // "now() > 0" has no required bindings — would otherwise trigger constant 
folding in asBitmapColumnIndex.
+    Filter filter = edf("now() > 0").toFilter();
+    Assert.assertNull(
+        "ExpressionFilter.getBitmapColumnIndex should return null for 
non-deterministic now()",
+        filter.getBitmapColumnIndex(null)
+    );
+  }
+
+  /**
+   * Two ExpressionDimFilters built at different times that both wrap now() 
would otherwise produce identical cache
+   * keys (now() stringifies the same way every time), allowing stale cached 
results to be reused.
+   * NowExpression mixes nanoTime into the cache key via 
decorateCacheKeyBuilder so two distinct filter instances
+   * over the same expression produce different bytes.
+   */
+  @Test
+  public void testNowDimFilterCacheKeyIsNotStableAcrossInstances()
+  {
+    Assert.assertFalse(
+        "ExpressionDimFilter cache keys for now() must differ across instances 
to defeat result caching",
+        java.util.Arrays.equals(edf("now()").getCacheKey(), 
edf("now()").getCacheKey())
+    );
+  }
+
   protected static ExpressionDimFilter edf(final String expression)
   {
     return new ExpressionDimFilter(expression, null, 
TestExprMacroTable.INSTANCE);
diff --git 
a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
index 2583b81deab..099c9184514 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputRowListPlusRawValues;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.MapBasedRow;
 import org.apache.druid.data.input.Row;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.query.expression.TestExprMacroTable;
@@ -507,4 +508,75 @@ public class TransformerTest extends 
InitializedNullHandlingTest
     Assert.assertArrayEquals(dimList.subList(0, 5).toArray(), (Object[]) 
actualTranformedRow.getRaw("dim"));
     Assert.assertEquals(ImmutableList.of("a"), 
actualTranformedRow.getDimension("dim1"));
   }
+
+  @Test
+  public void testNowTransform()
+  {
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        ImmutableList.of(
+            new ExpressionTransform("ingestion_time", "now()", 
TestExprMacroTable.INSTANCE),
+            new ExpressionTransform("lag_ms", "now() - __time", 
TestExprMacroTable.INSTANCE)
+        )
+    );
+
+    long beforeTransform = System.currentTimeMillis();
+
+    InputRow row = new MapBasedInputRow(
+        DateTimes.of("2024-01-01T00:00:00Z"),
+        ImmutableList.of("dim"),
+        ImmutableMap.of("dim", "value")
+    );
+
+    Transformer transformer = transformSpec.toTransformer();
+    InputRow transformed = transformer.transform(row);
+
+    Assert.assertNotNull(transformed);
+    Assert.assertNotNull(transformed.getRaw("ingestion_time"));
+
+    long ingestionTime = ((Number) 
transformed.getRaw("ingestion_time")).longValue();
+    long afterTransform = System.currentTimeMillis();
+    Assert.assertTrue(
+        "Ingestion time should be between transform start and end: "
+            + beforeTransform + " <= " + ingestionTime + " <= " + 
afterTransform,
+        ingestionTime >= beforeTransform && ingestionTime <= afterTransform
+    );
+
+    // Verify lag calculation (may be slightly different from ingestionTime - 
__time due to timing)
+    long lag = ((Number) transformed.getRaw("lag_ms")).longValue();
+    long eventTime = DateTimes.of("2024-01-01T00:00:00Z").getMillis();
+    long expectedLag = ingestionTime - eventTime;
+
+    // Allow small difference since now() is called twice (once for 
ingestion_time, once for lag_ms)
+    long lagDiff = Math.abs(lag - expectedLag);
+    Assert.assertTrue(
+        "Lag should be approximately correct (diff=" + lagDiff + "ms): 
expected=" + expectedLag + ", actual=" + lag,
+        lagDiff < 100  // Allow up to 100ms difference
+    );
+
+    // Verify lag is positive (ingestion happened after event)
+    Assert.assertTrue("Lag should be positive", lag > 0);
+  }
+
+  @Test
+  public void testNowCannotBeUsedForTimeColumn()
+  {
+    DruidExceptionMatcher.invalidInput().expectMessageIs(
+        "Cannot use non-deterministic expression[now()] to set column 
name[__time]."
+        + " Non-deterministic expressions such as now() are not supported as 
__time transforms."
+    ).assertThrowsAndMatches(
+        () -> new ExpressionTransform("__time", "now()", 
TestExprMacroTable.INSTANCE)
+    );
+  }
+
+  @Test
+  public void testNowRejectedWhenWrappedInArithmeticForTimeColumn()
+  {
+    DruidExceptionMatcher.invalidInput().expectMessageIs(
+        "Cannot use non-deterministic expression[now() + 1000] to set column 
name[__time]."
+        + " Non-deterministic expressions such as now() are not supported as 
__time transforms."
+    ).assertThrowsAndMatches(
+        () -> new ExpressionTransform("__time", "now() + 1000", 
TestExprMacroTable.INSTANCE)
+    );
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
index ec1db1ae6f8..06154fc95bd 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
@@ -1371,6 +1371,26 @@ public class ExpressionPlannerTest extends 
InitializedNullHandlingTest
     );
   }
 
+  @Test
+  public void testNowIsNotConstant()
+  {
+    ExpressionPlan thePlan = plan("now()");
+    Assert.assertFalse(
+        thePlan.is(ExpressionPlan.Trait.CONSTANT)
+    );
+    Assert.assertFalse(thePlan.isConstant());
+  }
+
+  @Test
+  public void testNowWrappedInExpressionIsNotConstant()
+  {
+    ExpressionPlan thePlan = plan("now() + 1000");
+    Assert.assertFalse(
+        thePlan.is(ExpressionPlan.Trait.CONSTANT)
+    );
+    Assert.assertFalse(thePlan.isConstant());
+  }
+
   private static ExpressionPlan plan(String expression)
   {
     return ExpressionPlanner.plan(SYNTHETIC_INSPECTOR, 
Parser.parse(expression, MACRO_TABLE));
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
index 99b6095f00f..b150cd8baa2 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
@@ -51,6 +51,7 @@ import 
org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseSingleValueDimensionSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.ConstantExprEvalSelector;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorFactory;
@@ -1134,6 +1135,40 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
     };
   }
 
+  @Test
+  public void test_now_selector_is_not_constant() throws InterruptedException
+  {
+    final Expr nowExpr = Parser.parse("now()", TestExprMacroTable.INSTANCE);
+
+    for (CursorFactory cursorFactory : CURSOR_FACTORIES) {
+      try (final CursorHolder cursorHolder = 
cursorFactory.makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+        Cursor cursor = cursorHolder.asCursor();
+        ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+        ColumnValueSelector<ExprEval> selector = 
ExpressionSelectors.makeExprEvalSelector(factory, nowExpr);
+        Assert.assertFalse(
+            "now() must not be folded into a ConstantExprEvalSelector because 
its value changes over time",
+            selector instanceof ConstantExprEvalSelector
+        );
+
+        final long first = selector.getLong();
+        Thread.sleep(1);
+        cursor.advance();
+        final long second = selector.getLong();
+
+        Assert.assertTrue(
+            "now() must be monotonic across rows; got first=" + first + " 
second=" + second,
+            second >= first
+        );
+        Assert.assertNotEquals(
+            "now() must re-evaluate after the cursor advances; a 
ConstantExprEvalSelector would return the same value",
+            first,
+            second
+        );
+      }
+    }
+  }
+
   private static <T> ColumnValueSelector<T> objectSelectorFromSupplier(
       final Supplier<T> supplier,
       final Class<T> clazz
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
index 676afd3531f..1b2d8ac34b7 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
@@ -792,4 +792,26 @@ public class ExpressionVirtualColumnTest extends 
InitializedNullHandlingTest
     Assert.assertTrue(multiConstantSelector instanceof 
ConstantMultiValueDimensionSelector);
     Assert.assertEquals(ImmutableList.of("a", "b", "c"), 
multiConstantSelector.getObject());
   }
+
+  @Test
+  public void testNowCacheKeyIsNotStableAcrossInstances()
+  {
+    ExpressionVirtualColumn vc1 = new ExpressionVirtualColumn(
+        "v0",
+        "now()",
+        ColumnType.LONG,
+        TestExprMacroTable.INSTANCE
+    );
+    ExpressionVirtualColumn vc2 = new ExpressionVirtualColumn(
+        "v0",
+        "now()",
+        ColumnType.LONG,
+        TestExprMacroTable.INSTANCE
+    );
+
+    Assert.assertFalse(
+        "ExpressionVirtualColumn cache keys for now() must differ across 
instances to defeat result caching",
+        Arrays.equals(vc1.getCacheKey(), vc2.getCacheKey())
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to