andygrove opened a new pull request, #4569: URL: https://github.com/apache/datafusion-comet/pull/4569
## Which issue does this PR close? Closes #. <!-- No tracking issue yet. Opening as a draft to gather feedback on the design; happy to file a tracking issue if there is interest. --> ## Rationale for this change When a DataFrame or table is cached (`df.cache()` / `CACHE TABLE`), Spark's `DefaultCachedBatchSerializer` stores each column in Spark's compressed columnar format. Comet does not treat `InMemoryTableScanExec` as native, so it inserts a `CometSparkToColumnarExec` above it and pays a JVM-to-Arrow conversion on every read of the cached data: ``` cached (compressed) -> decompress to Spark ColumnarBatch -> convert to Arrow -> native ``` That conversion runs on every scan, which undercuts the benefit of caching for native pipelines. This PR lets Comet store the cache as compressed Arrow IPC once, at cache-build time, so repeated scans feed native execution directly with no per-read conversion. This is the same approach used by other columnar Spark accelerators. ## What changes are included in this PR? A new `CometCachedBatchSerializer` (plugged into Spark's `spark.sql.cache.serializer`) that: - Encodes each cached batch to compressed Arrow IPC (reusing Comet's existing `serializeBatches`/`decodeBatches`), storing the bytes plus a Spark-format per-column stats row. - Extends Spark's `SimpleMetricsCachedBatchSerializer`, so batch-level partition pruning (`buildFilter`) works using the computed min/max/null/count stats. - Decodes back to `CometVector`-backed `ColumnarBatch` on read, with column pruning and an `InternalRow` fallback for non-Comet consumers. - Delegates transparently to Spark's `DefaultCachedBatchSerializer` for schemas it does not support (nested/complex types), so it is a safe drop-in. Supporting changes: - `CometSparkToColumnarExec` gains a passthrough fast-path: batches whose columns are already `CometVector` are forwarded without a re-copy (with a `numPassthroughBatches` metric). - `CometDriverPlugin` installs the serializer at startup when the new `spark.comet.cache.serializer.enabled` config (default off) is set, respecting any user-provided serializer. The Spark property is a static config, so it must be set before the session is created. - New config `spark.comet.cache.serializer.enabled` (default off). Supported flat types: boolean, integral, floating point, decimal, string, binary, date, timestamp, timestamp_ntz. Nested types delegate. Off by default. ## How are these changes tested? New tests: - `CometCachedBatchSerializerSuite`: stats-row layout; build path (compressed IPC + stats); decode round-trip; column pruning; the columnar read path (identity and pruned projections); the `CometSparkToColumnarExec` passthrough metric; a regression test that string min/max stats survive encoding (they are copied off the Arrow buffer); and end-to-end tests for cached-vs-uncached correctness, filtered pruning on numeric and string columns, `MEMORY_AND_DISK` spill, array-type delegation, and `timestamp_ntz` value round-tripping. - `CometPluginsSuite`: the driver plugin installs the serializer only when enabled and never overrides a user-provided non-default serializer. Verified compiling and passing on Spark 3.4, 3.5, and 4.x profiles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
