Copilot commented on code in PR #12403:
URL: https://github.com/apache/gluten/pull/12403#discussion_r3498398552
##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala:
##########
@@ -338,25 +338,13 @@ private class GlutenOptimizedWriterShuffleReader(
).toCompletionIterator
// Create a key/value iterator for each stream
- val recordIter = dep match {
- case columnarDep: ColumnarShuffleDependency[Int, ColumnarBatch,
ColumnarBatch] =>
- // If the dependency is a ColumnarShuffleDependency, we use the
columnar serializer.
- columnarDep.serializer
- .newInstance()
- .asInstanceOf[ColumnarBatchSerializerInstance]
- .deserializeStreams(wrappedStreams)
- .asKeyValueIterator
- case _ =>
- val serializerInstance = dep.serializer.newInstance()
- // Create a key/value iterator for each stream
- wrappedStreams.flatMap {
- case (blockId, wrappedStream) =>
- // Note: the asKeyValueIterator below wraps a key/value iterator
inside of a
- // NextIterator. The NextIterator makes sure that close() is
called on the
- // underlying InputStream when all records have been read.
-
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
- }
- }
+ val recordIter = dep.asInstanceOf[ColumnarShuffleDependency[Int,
ColumnarBatch, ColumnarBatch]]
+ .serializer
+ .newInstance()
+ .asInstanceOf[ColumnarBatchSerializerInstance]
+ .deserializeStreams(wrappedStreams)
+ .asKeyValueIterator
+
Review Comment:
This unconditional cast to `ColumnarShuffleDependency` will throw a
`ClassCastException` if `GlutenOptimizedWriterShuffleReader` is ever
constructed with a non-columnar dependency. Since the PR intent is to treat
such dependencies as invalid, it’s better to fail fast with a clear exception
message (and it also makes the earlier `dep match { ... case _ => ... }` branch
effectively unreachable).
##########
backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializerInstance.scala:
##########
@@ -29,6 +29,7 @@ abstract class ColumnarBatchSerializerInstance extends
SerializerInstance {
/** Deserialize the streams of ColumnarBatches. */
def deserializeStreams(streams: Iterator[(BlockId, InputStream)]):
DeserializationStream
+ // These methods are never called by shuffle code.
override def serialize[T: ClassTag](t: T): ByteBuffer = {
throw new UnsupportedOperationException
}
Review Comment:
The newly-added `UnsupportedOperationException`s don’t include messages,
which makes accidental misuse harder to debug. Also, the comment “never called
by shuffle code” is ambiguous (Spark’s shuffle code does call these APIs in
general; it’s Gluten’s columnar shuffle path that doesn’t). Adding explicit
messages and clarifying the comment would make failures self-explanatory.
##########
backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleReader.scala:
##########
@@ -91,25 +91,12 @@ class ColumnarShuffleReader[K, C](
fetchContinuousBlocksInBatch
).toCompletionIterator
- val recordIter = dep match {
- case columnarDep: ColumnarShuffleDependency[K, _, C] =>
- // If the dependency is a ColumnarShuffleDependency, we use the
columnar serializer.
- columnarDep.serializer
- .newInstance()
- .asInstanceOf[ColumnarBatchSerializerInstance]
- .deserializeStreams(wrappedStreams)
- .asKeyValueIterator
- case _ =>
- val serializerInstance = dep.serializer.newInstance()
- // Create a key/value iterator for each stream
- wrappedStreams.flatMap {
- case (blockId, wrappedStream) =>
- // Note: the asKeyValueIterator below wraps a key/value iterator
inside of a
- // NextIterator. The NextIterator makes sure that close() is
called on the
- // underlying InputStream when all records have been read.
-
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
- }
- }
+ val recordIter = dep.asInstanceOf[ColumnarShuffleDependency[K, C, C]]
+ .serializer
+ .newInstance()
+ .asInstanceOf[ColumnarBatchSerializerInstance]
+ .deserializeStreams(wrappedStreams)
+ .asKeyValueIterator
Review Comment:
`dep.asInstanceOf[ColumnarShuffleDependency[K, C, C]]` will fail with an
unhelpful `ClassCastException` if this reader is ever invoked with a
non-columnar shuffle dependency, and the type parameters also incorrectly imply
`V == C` (the dependency’s map-side value type can differ from `C`, e.g. when
an aggregator is present). Consider pattern-matching the dependency and
throwing a clear `IllegalArgumentException`, while keeping the second type
parameter as a wildcard.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]