Copilot commented on code in PR #12403:
URL: https://github.com/apache/gluten/pull/12403#discussion_r3498398552


##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala:
##########
@@ -338,25 +338,13 @@ private class GlutenOptimizedWriterShuffleReader(
     ).toCompletionIterator
 
     // Create a key/value iterator for each stream
-    val recordIter = dep match {
-      case columnarDep: ColumnarShuffleDependency[Int, ColumnarBatch, 
ColumnarBatch] =>
-        // If the dependency is a ColumnarShuffleDependency, we use the 
columnar serializer.
-        columnarDep.serializer
-          .newInstance()
-          .asInstanceOf[ColumnarBatchSerializerInstance]
-          .deserializeStreams(wrappedStreams)
-          .asKeyValueIterator
-      case _ =>
-        val serializerInstance = dep.serializer.newInstance()
-        // Create a key/value iterator for each stream
-        wrappedStreams.flatMap {
-          case (blockId, wrappedStream) =>
-            // Note: the asKeyValueIterator below wraps a key/value iterator 
inside of a
-            // NextIterator. The NextIterator makes sure that close() is 
called on the
-            // underlying InputStream when all records have been read.
-            
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
-        }
-    }
+    val recordIter = dep.asInstanceOf[ColumnarShuffleDependency[Int, 
ColumnarBatch, ColumnarBatch]]
+      .serializer
+      .newInstance()
+      .asInstanceOf[ColumnarBatchSerializerInstance]
+      .deserializeStreams(wrappedStreams)
+      .asKeyValueIterator
+

Review Comment:
   This unconditional cast to `ColumnarShuffleDependency` will throw a 
`ClassCastException` if `GlutenOptimizedWriterShuffleReader` is ever 
constructed with a non-columnar dependency. Since the PR intent is to treat 
such dependencies as invalid, it’s better to fail fast with a clear exception 
message (and it also makes the earlier `dep match { ... case _ => ... }` branch 
effectively unreachable).



##########
backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializerInstance.scala:
##########
@@ -29,6 +29,7 @@ abstract class ColumnarBatchSerializerInstance extends 
SerializerInstance {
   /** Deserialize the streams of ColumnarBatches. */
   def deserializeStreams(streams: Iterator[(BlockId, InputStream)]): 
DeserializationStream
 
+  // These methods are never called by shuffle code.
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
     throw new UnsupportedOperationException
   }

Review Comment:
   The newly-added `UnsupportedOperationException`s don’t include messages, 
which makes accidental misuse harder to debug. Also, the comment “never called 
by shuffle code” is ambiguous (Spark’s shuffle code does call these APIs in 
general; it’s Gluten’s columnar shuffle path that doesn’t). Adding explicit 
messages and clarifying the comment would make failures self-explanatory.



##########
backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleReader.scala:
##########
@@ -91,25 +91,12 @@ class ColumnarShuffleReader[K, C](
       fetchContinuousBlocksInBatch
     ).toCompletionIterator
 
-    val recordIter = dep match {
-      case columnarDep: ColumnarShuffleDependency[K, _, C] =>
-        // If the dependency is a ColumnarShuffleDependency, we use the 
columnar serializer.
-        columnarDep.serializer
-          .newInstance()
-          .asInstanceOf[ColumnarBatchSerializerInstance]
-          .deserializeStreams(wrappedStreams)
-          .asKeyValueIterator
-      case _ =>
-        val serializerInstance = dep.serializer.newInstance()
-        // Create a key/value iterator for each stream
-        wrappedStreams.flatMap {
-          case (blockId, wrappedStream) =>
-            // Note: the asKeyValueIterator below wraps a key/value iterator 
inside of a
-            // NextIterator. The NextIterator makes sure that close() is 
called on the
-            // underlying InputStream when all records have been read.
-            
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
-        }
-    }
+    val recordIter = dep.asInstanceOf[ColumnarShuffleDependency[K, C, C]]
+      .serializer
+      .newInstance()
+      .asInstanceOf[ColumnarBatchSerializerInstance]
+      .deserializeStreams(wrappedStreams)
+      .asKeyValueIterator
 

Review Comment:
   `dep.asInstanceOf[ColumnarShuffleDependency[K, C, C]]` will fail with an 
unhelpful `ClassCastException` if this reader is ever invoked with a 
non-columnar shuffle dependency, and the type parameters also incorrectly imply 
`V == C` (the dependency’s map-side value type can differ from `C`, e.g. when 
an aggregator is present). Consider pattern-matching the dependency and 
throwing a clear `IllegalArgumentException`, while keeping the second type 
parameter as a wildcard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to