schenksj commented on code in PR #4532:
URL: https://github.com/apache/datafusion-comet/pull/4532#discussion_r3488519298
##########
spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:
##########
@@ -426,4 +440,39 @@ object Utils extends CometTypeShim with Logging {
throw new SparkException(s"Unsupported Arrow Vector for $reason:
${valueVector.getClass}")
}
}
+
+ /**
+ * Materialize a Spark `ConstantColumnVector` into a fresh Arrow
`FieldVector` whose value is
+ * the same constant repeated `numRows` times.
+ *
+ * Spark wraps file-source partition columns and other per-batch constants in
+ * `ConstantColumnVector`; downstream Comet operators feeding
`NativeUtil.exportBatch` or
+ * `getBatchFieldVectors` trip on it because those paths only handle
`CometVector`. This helper
+ * materializes the constant into an Arrow vector inline.
+ *
+ * The caller owns the returned vector and must close it (or hand it to
Arrow's exporter, which
+ * transfers ownership). The vector is allocated against `allocator`, sized
to exactly
+ * `numRows`, and pre-filled with the constant value (or null when
`cv.isNullAt(0)`).
+ *
+ * All Spark types are supported (delegates to the per-type
ArrowFieldWriters, which include
+ * struct/array/map); throws only for a type Arrow itself can't represent.
+ */
+ def materializeConstantColumnVector(
+ cv: ConstantColumnVector,
+ dt: DataType,
+ numRows: Int,
+ name: String,
+ allocator: BufferAllocator): FieldVector = {
+ // "UTC" is deliberate here, NOT the session-local timezone that
`toArrowSchema` threads
+ // through. These constants are materialised alongside non-constant
columns in the same
+ // batch/`VectorSchemaRoot`, and Comet's non-constant `TimestampType`
columns are Arrow
+ // vectors exported from native execution, where Comet always tags them
`Timestamp(us, "UTC")`
+ // (see native `serde.rs`). Spark itself stores `TimestampType` as micros
in UTC, so the
+ // constant's value is already a UTC instant. Tagging the materialised
constant "UTC" keeps its
+ // Arrow timezone metadata consistent with its sibling timestamp columns;
threading the
+ // session-local timezone here would instead introduce the mismatch.
`TimestampNTZType` carries
+ // no zone regardless of this argument.
+ org.apache.spark.sql.comet.execution.arrow.ConstantColumnVectors
Review Comment:
Done in 18c147f19 — went with having both callers (`getBatchFieldVectors`
and `NativeUtil.exportBatch`) call `ConstantColumnVectors.materialize` directly
and deleted the `Utils.materializeConstantColumnVector` forwarder. The `"UTC"`
rationale now lives once, on `ConstantColumnVectors`, with a one-line pointer
at each call site. (Also dropped the now-unused `BufferAllocator` import from
`Utils.scala`.)
##########
spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala:
##########
@@ -137,6 +137,19 @@ class NativeUtil {
provider,
arrowArray,
arrowSchema)
+ case cv:
org.apache.spark.sql.execution.vectorized.ConstantColumnVector =>
Review Comment:
Done in 18c147f19 — imported `ConstantColumnVector` (and
`ConstantColumnVectors`) and use the short names, matching the rest of the file.
##########
spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala:
##########
@@ -137,6 +137,19 @@ class NativeUtil {
provider,
arrowArray,
arrowSchema)
+ case cv:
org.apache.spark.sql.execution.vectorized.ConstantColumnVector =>
+ // Spark uses ConstantColumnVector for partition columns / per-batch
constants (e.g.
+ // partition values, synthetic columns). Materialise to a fresh
Arrow vector so Comet's
+ // native side -- which expects Arrow Arrays only -- can ingest the
batch. Without this,
+ // queries that pull constants through a Comet operator fail with
"Comet execution only
+ // takes Arrow Arrays".
+ val rows = batch.numRows()
Review Comment:
Done in 18c147f19 — the arm now reads `numRows +=
materialised.getValueCount` after the writer finishes, symmetric with the
`CometVector` arm.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]