[ 
https://issues.apache.org/jira/browse/FLINK-33950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808570#comment-17808570
 ] 

Jacky Lau edited comment on FLINK-33950 at 1/19/24 9:25 AM:
------------------------------------------------------------

[~twalthr]  bellow is spark code
{code:java}
val useHash = Aggregate.supportsHashAggregate(
  aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
val forceSortAggregate = forceApplySortAggregate(child.conf)

if (useHash && !forceSortAggregate) {
  HashAggregateExec(..)
}

def isAggregateBufferMutable(schema: StructType): Boolean = {
  schema.forall(f => UnsafeRow.isMutable(f.dataType))
}

def supportsHashAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean = 
{
  val aggregationBufferSchema = 
StructType.fromAttributes(aggregateBufferAttributes)
  isAggregateBufferMutable(aggregationBufferSchema)
}

/**
 * Field types that can be updated in place in UnsafeRows (e.g. we support 
set() for these types)
 */
public static boolean isMutable(DataType dt) {
  if (dt instanceof UserDefinedType) {
    return isMutable(((UserDefinedType<?>) dt).sqlType());
  }
  PhysicalDataType pdt = PhysicalDataType.apply(dt);
  return pdt instanceof PhysicalPrimitiveType || pdt instanceof 
PhysicalDecimalType ||
    pdt instanceof PhysicalCalendarIntervalType;
}


and these type do not have RowTypw



 {code}


was (Author: jackylau):
bellow is spark code
{code:java}
val useHash = Aggregate.supportsHashAggregate(
  aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
val forceSortAggregate = forceApplySortAggregate(child.conf)

if (useHash && !forceSortAggregate) {
  HashAggregateExec(..)
}

def isAggregateBufferMutable(schema: StructType): Boolean = {
  schema.forall(f => UnsafeRow.isMutable(f.dataType))
}

def supportsHashAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean = 
{
  val aggregationBufferSchema = 
StructType.fromAttributes(aggregateBufferAttributes)
  isAggregateBufferMutable(aggregationBufferSchema)
}

/**
 * Field types that can be updated in place in UnsafeRows (e.g. we support 
set() for these types)
 */
public static boolean isMutable(DataType dt) {
  if (dt instanceof UserDefinedType) {
    return isMutable(((UserDefinedType<?>) dt).sqlType());
  }
  PhysicalDataType pdt = PhysicalDataType.apply(dt);
  return pdt instanceof PhysicalPrimitiveType || pdt instanceof 
PhysicalDecimalType ||
    pdt instanceof PhysicalCalendarIntervalType;
}


and these type do not have RowTypw



 {code}

> Update max aggregate functions to new type system
> -------------------------------------------------
>
>                 Key: FLINK-33950
>                 URL: https://issues.apache.org/jira/browse/FLINK-33950
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>    Affects Versions: 1.19.0
>            Reporter: Jacky Lau
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to