[ 
https://issues.apache.org/jira/browse/SPARK-27851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marc Arndt updated SPARK-27851:
-------------------------------
    Description: 
According to the BroadcastMode API the BroadcastMode#transform methods are 
allows to return a result object of an arbitrary type:

{code:scala}
/**
 * Marker trait to identify the shape in which tuples are broadcasted. Typical 
examples of this are
 * identity (tuples remain unchanged) or hashed (tuples are converted into some 
hash index).
 */
trait BroadcastMode {
  def transform(rows: Array[InternalRow]): Any

  def transform(rows: Iterator[InternalRow], sizeHint: Option[Long]): Any

  def canonicalized: BroadcastMode
}
{code}

When looking at the code which later uses the instantiated BroadcastMode 
objects in BroadcastExchangeExec it becomes that this is not really the base. 

The following lines in BroadcastExchangeExec suggest that only objects of type 
HashedRelation and Array[InternalRow] are allowed as a result for the 
BroadcastMode#transform methods:

{code:scala}
// Construct the relation.
val relation = mode.transform(input, Some(numRows))

val dataSize = relation match {
    case map: HashedRelation =>
        map.estimatedSize
    case arr: Array[InternalRow] =>
        arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
    case _ =>
        throw new SparkException("[BUG] BroadcastMode.transform returned 
unexpected type: " +
            relation.getClass.getName)
}
{code}

I believe that this is the only occurrence in the code where the result of the 
BroadcastMode#transform method must be either of type HashedRelation or 
Array[InternalRow]. Therefore to allow for broader custom implementations of 
the BroadcastMode I believe it would be a good idea to solve the calculation of 
the data size of the broadcast value in an independent way of the used 
BroadcastMode implemented.

One way this could be done is by providing an additional 
BroadcastMode#calculateDataSize method, which needs to be implemented by the 
BroadcastMode implementations. This methods could then return the required 
number of bytes for a given broadcast value.

  was:
According to the BroadcastMode API the BroadcastMode#transform methods are 
allows to return a result object of an arbitrary type:

{code:scala}
/**
 * Marker trait to identify the shape in which tuples are broadcasted. Typical 
examples of this are
 * identity (tuples remain unchanged) or hashed (tuples are converted into some 
hash index).
 */
trait BroadcastMode {
  def transform(rows: Array[InternalRow]): Any

  def transform(rows: Iterator[InternalRow], sizeHint: Option[Long]): Any

  def canonicalized: BroadcastMode
}
{code}

When looking at the code which later uses the instantiated BroadcastMode 
objects in BroadcastExchangeExec it becomes that this is not really the base. 

The following lines in BroadcastExchangeExec suggest that only objects of type 
HashRElation and Array[InternalRow] are allowed as a result for the 
BroadcastMode#transform methods:

{code:scala}
// Construct the relation.
val relation = mode.transform(input, Some(numRows))

val dataSize = relation match {
    case map: HashedRelation =>
        map.estimatedSize
    case arr: Array[InternalRow] =>
        arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
    case _ =>
        throw new SparkException("[BUG] BroadcastMode.transform returned 
unexpected type: " +
            relation.getClass.getName)
}
{code}

I believe that this is the only occurrence in the code where the result of the 
BroadcastMode#transform method must be either of type HashedRelation or 
Array[InternalRow]. Therefore to allow for broader custom implementations of 
the BroadcastMode I believe it would be a good idea to solve the calculation of 
the data size of the broadcast value in an independent way of the used 
BroadcastMode implemented.

One way this could be done is by providing an additional 
BroadcastMode#calculateDataSize method, which needs to be implemented by the 
BroadcastMode implementations. This methods could then return the required 
number of bytes for a given broadcast value.


> Allow for custom BroadcastMode return values
> --------------------------------------------
>
>                 Key: SPARK-27851
>                 URL: https://issues.apache.org/jira/browse/SPARK-27851
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer, SQL
>    Affects Versions: 2.4.3
>            Reporter: Marc Arndt
>            Priority: Major
>
> According to the BroadcastMode API the BroadcastMode#transform methods are 
> allows to return a result object of an arbitrary type:
> {code:scala}
> /**
>  * Marker trait to identify the shape in which tuples are broadcasted. 
> Typical examples of this are
>  * identity (tuples remain unchanged) or hashed (tuples are converted into 
> some hash index).
>  */
> trait BroadcastMode {
>   def transform(rows: Array[InternalRow]): Any
>   def transform(rows: Iterator[InternalRow], sizeHint: Option[Long]): Any
>   def canonicalized: BroadcastMode
> }
> {code}
> When looking at the code which later uses the instantiated BroadcastMode 
> objects in BroadcastExchangeExec it becomes that this is not really the base. 
> The following lines in BroadcastExchangeExec suggest that only objects of 
> type HashedRelation and Array[InternalRow] are allowed as a result for the 
> BroadcastMode#transform methods:
> {code:scala}
> // Construct the relation.
> val relation = mode.transform(input, Some(numRows))
> val dataSize = relation match {
>     case map: HashedRelation =>
>         map.estimatedSize
>     case arr: Array[InternalRow] =>
>         arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
>     case _ =>
>         throw new SparkException("[BUG] BroadcastMode.transform returned 
> unexpected type: " +
>             relation.getClass.getName)
> }
> {code}
> I believe that this is the only occurrence in the code where the result of 
> the BroadcastMode#transform method must be either of type HashedRelation or 
> Array[InternalRow]. Therefore to allow for broader custom implementations of 
> the BroadcastMode I believe it would be a good idea to solve the calculation 
> of the data size of the broadcast value in an independent way of the used 
> BroadcastMode implemented.
> One way this could be done is by providing an additional 
> BroadcastMode#calculateDataSize method, which needs to be implemented by the 
> BroadcastMode implementations. This methods could then return the required 
> number of bytes for a given broadcast value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to