mbutrovich commented on code in PR #4532:
URL: https://github.com/apache/datafusion-comet/pull/4532#discussion_r3477818155


##########
spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:
##########
@@ -426,4 +440,39 @@ object Utils extends CometTypeShim with Logging {
         throw new SparkException(s"Unsupported Arrow Vector for $reason: 
${valueVector.getClass}")
     }
   }
+
+  /**
+   * Materialize a Spark `ConstantColumnVector` into a fresh Arrow 
`FieldVector` whose value is
+   * the same constant repeated `numRows` times.
+   *
+   * Spark wraps file-source partition columns and other per-batch constants in
+   * `ConstantColumnVector`; downstream Comet operators feeding 
`NativeUtil.exportBatch` or
+   * `getBatchFieldVectors` trip on it because those paths only handle 
`CometVector`. This helper
+   * materializes the constant into an Arrow vector inline.
+   *
+   * The caller owns the returned vector and must close it (or hand it to 
Arrow's exporter, which
+   * transfers ownership). The vector is allocated against `allocator`, sized 
to exactly
+   * `numRows`, and pre-filled with the constant value (or null when 
`cv.isNullAt(0)`).
+   *
+   * All Spark types are supported (delegates to the per-type 
ArrowFieldWriters, which include
+   * struct/array/map); throws only for a type Arrow itself can't represent.
+   */
+  def materializeConstantColumnVector(
+      cv: ConstantColumnVector,
+      dt: DataType,
+      numRows: Int,
+      name: String,
+      allocator: BufferAllocator): FieldVector = {
+    // "UTC" is deliberate here, NOT the session-local timezone that 
`toArrowSchema` threads
+    // through. These constants are materialised alongside non-constant 
columns in the same
+    // batch/`VectorSchemaRoot`, and Comet's non-constant `TimestampType` 
columns are Arrow
+    // vectors exported from native execution, where Comet always tags them 
`Timestamp(us, "UTC")`
+    // (see native `serde.rs`). Spark itself stores `TimestampType` as micros 
in UTC, so the
+    // constant's value is already a UTC instant. Tagging the materialised 
constant "UTC" keeps its
+    // Arrow timezone metadata consistent with its sibling timestamp columns; 
threading the
+    // session-local timezone here would instead introduce the mismatch. 
`TimestampNTZType` carries
+    // no zone regardless of this argument.
+    org.apache.spark.sql.comet.execution.arrow.ConstantColumnVectors

Review Comment:
   Small one: `Utils.materializeConstantColumnVector` is a one-line forward to 
`ConstantColumnVectors.materialize` with `"UTC"` hardcoded. Could the two 
callers (`getBatchFieldVectors` and `NativeUtil.exportBatch`) call 
`ConstantColumnVectors.materialize` directly and let the UTC rationale live 
once on that method? The current shape duplicates the rationale doc across both 
files. Alternative: drop the `timeZoneId` parameter from 
`ConstantColumnVectors.materialize` entirely and bake `"UTC"` in there, since 
the comment justifies UTC unconditionally.



##########
spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala:
##########
@@ -137,6 +137,19 @@ class NativeUtil {
             provider,
             arrowArray,
             arrowSchema)
+        case cv: 
org.apache.spark.sql.execution.vectorized.ConstantColumnVector =>

Review Comment:
   Could `ConstantColumnVector` be imported and 
`Utils.materializeConstantColumnVector` use the existing `Utils` import on line 
28? Matches the rest of the file.



##########
spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala:
##########
@@ -137,6 +137,19 @@ class NativeUtil {
             provider,
             arrowArray,
             arrowSchema)
+        case cv: 
org.apache.spark.sql.execution.vectorized.ConstantColumnVector =>
+          // Spark uses ConstantColumnVector for partition columns / per-batch 
constants (e.g.
+          // partition values, synthetic columns). Materialise to a fresh 
Arrow vector so Comet's
+          // native side -- which expects Arrow Arrays only -- can ingest the 
batch. Without this,
+          // queries that pull constants through a Comet operator fail with 
"Comet execution only
+          // takes Arrow Arrays".
+          val rows = batch.numRows()

Review Comment:
   Minor: the existing `CometVector` arm reads `valueVector.getValueCount` for 
`numRows`. This arm reads `batch.numRows()` directly. Reading 
`materialised.getValueCount` after the writer finishes would be symmetric with 
the other arm and catch a hypothetical writer underrun.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to