[
https://issues.apache.org/jira/browse/FLINK-33950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808570#comment-17808570
]
Jacky Lau edited comment on FLINK-33950 at 1/19/24 9:25 AM:
------------------------------------------------------------
[~twalthr] bellow is spark code
{code:java}
val useHash = Aggregate.supportsHashAggregate(
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
val forceSortAggregate = forceApplySortAggregate(child.conf)
if (useHash && !forceSortAggregate) {
HashAggregateExec(..)
}
def isAggregateBufferMutable(schema: StructType): Boolean = {
schema.forall(f => UnsafeRow.isMutable(f.dataType))
}
def supportsHashAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean =
{
val aggregationBufferSchema =
StructType.fromAttributes(aggregateBufferAttributes)
isAggregateBufferMutable(aggregationBufferSchema)
}
/**
* Field types that can be updated in place in UnsafeRows (e.g. we support
set() for these types)
*/
public static boolean isMutable(DataType dt) {
if (dt instanceof UserDefinedType) {
return isMutable(((UserDefinedType<?>) dt).sqlType());
}
PhysicalDataType pdt = PhysicalDataType.apply(dt);
return pdt instanceof PhysicalPrimitiveType || pdt instanceof
PhysicalDecimalType ||
pdt instanceof PhysicalCalendarIntervalType;
}
and these type do not have RowTypw
{code}
was (Author: jackylau):
bellow is spark code
{code:java}
val useHash = Aggregate.supportsHashAggregate(
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
val forceSortAggregate = forceApplySortAggregate(child.conf)
if (useHash && !forceSortAggregate) {
HashAggregateExec(..)
}
def isAggregateBufferMutable(schema: StructType): Boolean = {
schema.forall(f => UnsafeRow.isMutable(f.dataType))
}
def supportsHashAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean =
{
val aggregationBufferSchema =
StructType.fromAttributes(aggregateBufferAttributes)
isAggregateBufferMutable(aggregationBufferSchema)
}
/**
* Field types that can be updated in place in UnsafeRows (e.g. we support
set() for these types)
*/
public static boolean isMutable(DataType dt) {
if (dt instanceof UserDefinedType) {
return isMutable(((UserDefinedType<?>) dt).sqlType());
}
PhysicalDataType pdt = PhysicalDataType.apply(dt);
return pdt instanceof PhysicalPrimitiveType || pdt instanceof
PhysicalDecimalType ||
pdt instanceof PhysicalCalendarIntervalType;
}
and these type do not have RowTypw
{code}
> Update max aggregate functions to new type system
> -------------------------------------------------
>
> Key: FLINK-33950
> URL: https://issues.apache.org/jira/browse/FLINK-33950
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Affects Versions: 1.19.0
> Reporter: Jacky Lau
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.19.0
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)