[
https://issues.apache.org/jira/browse/FLINK-32257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17740085#comment-17740085
]
Jacky Lau edited comment on FLINK-32257 at 7/5/23 8:27 AM:
-----------------------------------------------------------
[~dwysakowicz] after read the
[https://github.com/apache/flink/pull/22730#discussion_r1242187327]
and test in sql client. it throw exception.
{code:java}
Flink SQL> select greatest(array[1, 2],array[2, 3]);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type
expected, but was 'ARRAY<INT NOT NULL> NOT NULL'.
Flink SQL> select greatest(row(1, 2),row(2, 3));
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type
expected, but was 'ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL> NOT NULL'.
{code}
and after dig the code, flink doesn't support complex type compare.
and we can supports like spark latter.
and both the ArrayComparableElementTypeStrategy and ComparableTypeStrategy may
need have a abstract. or ArrayComparableElementTypeStrategy should have a
comments about the remove {{areTypesOfSameRootComparable(firstType,
secondType). }}
{{it will cause lost easily when flink }}{{supports complex type compare
because others don't know it.}}
{{what do you think?}}
{{and if you have time i can support the complex type }}{{{}compare{}}}{{ if
you have time to help review?
{code:java}
case class PhysicalArrayType(
elementType: DataType, containsNull: Boolean) extends PhysicalDataType {
override private[sql] type InternalType = ArrayData
override private[sql] def ordering = interpretedOrdering
@transient private[sql] lazy val tag = typeTag[InternalType]
@transient
private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new
Ordering[ArrayData] {
private[this] val elementOrdering: Ordering[Any] =
PhysicalDataType(elementType).ordering.asInstanceOf[Ordering[Any]]
def compare(x: ArrayData, y: ArrayData): Int = {
val leftArray = x
val rightArray = y
val minLength = scala.math.min(leftArray.numElements(),
rightArray.numElements())
var i = 0
while (i < minLength) {
val isNullLeft = leftArray.isNullAt(i)
val isNullRight = rightArray.isNullAt(i)
if (isNullLeft && isNullRight) {
// Do nothing.
} else if (isNullLeft) {
return -1
} else if (isNullRight) {
return 1
} else {
val comp =
elementOrdering.compare(
leftArray.get(i, elementType),
rightArray.get(i, elementType))
if (comp != 0) {
return comp
}
}
i += 1
}
if (leftArray.numElements() < rightArray.numElements()) {
-1
} else if (leftArray.numElements() > rightArray.numElements()) {
1
} else {
0
}
}
}
}
case class PhysicalStructType(fields: Array[StructField]) extends
PhysicalDataType {
override private[sql] type InternalType = Any
override private[sql] def ordering =
forSchema(this.fields.map(_.dataType)).asInstanceOf[Ordering[InternalType]]
@transient private[sql] lazy val tag = typeTag[InternalType]
private[sql] def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
new InterpretedOrdering(dataTypes.zipWithIndex.map {
case (dt, index) => SortOrder(BoundReference(index, dt, nullable = true),
Ascending)
})
}
} {code}
was (Author: jackylau):
[~dwysakowicz] after read the
[https://github.com/apache/flink/pull/22730#discussion_r1242187327]
and test in sql client. it throw exception. and after dig the code, flink
doesn't support complex type compare.
and we can supports like spark latter.
and both the ArrayComparableElementTypeStrategy and ComparableTypeStrategy may
need have a abstract. or ArrayComparableElementTypeStrategy should have a
comments about the remove {{areTypesOfSameRootComparable(firstType,
secondType). it will cause lost easily when flink }}
{{supports complex type compare because others don't know it.}}
{{what do you think?}}
{{}}
{{and if you have time i can support the complex type }}{{compare}}{{ if you
have time to help review?}}
{code:java}
case class PhysicalArrayType(
elementType: DataType, containsNull: Boolean) extends PhysicalDataType {
override private[sql] type InternalType = ArrayData
override private[sql] def ordering = interpretedOrdering
@transient private[sql] lazy val tag = typeTag[InternalType]
@transient
private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new
Ordering[ArrayData] {
private[this] val elementOrdering: Ordering[Any] =
PhysicalDataType(elementType).ordering.asInstanceOf[Ordering[Any]]
def compare(x: ArrayData, y: ArrayData): Int = {
val leftArray = x
val rightArray = y
val minLength = scala.math.min(leftArray.numElements(),
rightArray.numElements())
var i = 0
while (i < minLength) {
val isNullLeft = leftArray.isNullAt(i)
val isNullRight = rightArray.isNullAt(i)
if (isNullLeft && isNullRight) {
// Do nothing.
} else if (isNullLeft) {
return -1
} else if (isNullRight) {
return 1
} else {
val comp =
elementOrdering.compare(
leftArray.get(i, elementType),
rightArray.get(i, elementType))
if (comp != 0) {
return comp
}
}
i += 1
}
if (leftArray.numElements() < rightArray.numElements()) {
-1
} else if (leftArray.numElements() > rightArray.numElements()) {
1
} else {
0
}
}
}
}
case class PhysicalStructType(fields: Array[StructField]) extends
PhysicalDataType {
override private[sql] type InternalType = Any
override private[sql] def ordering =
forSchema(this.fields.map(_.dataType)).asInstanceOf[Ordering[InternalType]]
@transient private[sql] lazy val tag = typeTag[InternalType]
private[sql] def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
new InterpretedOrdering(dataTypes.zipWithIndex.map {
case (dt, index) => SortOrder(BoundReference(index, dt, nullable = true),
Ascending)
})
}
} {code}
{code:java}
Flink SQL> select greatest(array[1, 2],array[2, 3]);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type
expected, but was 'ARRAY<INT NOT NULL> NOT NULL'.
Flink SQL> select greatest(row(1, 2),row(2, 3));
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type
expected, but was 'ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL> NOT NULL'.
{code}
> Add ARRAY_MAX support in SQL & Table API
> ----------------------------------------
>
> Key: FLINK-32257
> URL: https://issues.apache.org/jira/browse/FLINK-32257
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.18.0
> Reporter: Bonnie Varghese
> Assignee: Hanyu Zheng
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0
>
>
> This is an implementation of ARRAY_MAX
> The array_max() function concatenates get the maximum element from input
> array.
> The result matches the type of the elements. NULL elements are skipped. If
> array is empty, or contains only NULL elements, NULL is returned.
>
> Syntax
> array_max(array)
> Arguments
> array: Any ARRAY with elements for which order is supported.
>
> Returns
> The result matches the type of the elements. NULL elements are skipped. If
> array is empty, or contains only NULL elements, NULL is returned.
>
> Examples
> SQL
>
> > SELECT array_max(array(1, 20, NULL, 3)); 20
>
> {code:java}
> // Fink SQL-> select array_max(array[1, 20, null, 3])
> 20{code}
>
> See also
> spark
> [https://spark.apache.org/docs/latest/api/sql/index.html#array_max|https://spark.apache.org/docs/latest/api/sql/index.html#array_min]
> presto [https://prestodb.io/docs/current/functions/array.html]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)