[
https://issues.apache.org/jira/browse/FLINK-32257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17740085#comment-17740085
]
Jacky Lau edited comment on FLINK-32257 at 7/5/23 8:28 AM:
-----------------------------------------------------------
[~dwysakowicz] after read the
[https://github.com/apache/flink/pull/22730#discussion_r1242187327]
and test in sql client. it throw exception.
{code:java}
Flink SQL> select greatest(array[1, 2],array[2, 3]);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type
expected, but was 'ARRAY<INT NOT NULL> NOT NULL'.
Flink SQL> select greatest(row(1, 2),row(2, 3));
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type
expected, but was 'ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL> NOT NULL'.
{code}
and after dig the code, flink doesn't support complex type compare.
and we can supports like spark latter.
and both the ArrayComparableElementTypeStrategy and ComparableTypeStrategy may
need have a abstract. or ArrayComparableElementTypeStrategy should have a
comments about the remove \{{areTypesOfSameRootComparable(firstType,
secondType). }}
{{it will cause lost easily when flink }}{{supports complex type compare
because others don't know it.}}
{{what do you think?}}
{{and if you have time i can support the complex type }}{{compare }} if you
have time to help review?
{code:java}
case class PhysicalArrayType(
elementType: DataType, containsNull: Boolean) extends PhysicalDataType {
override private[sql] type InternalType = ArrayData
override private[sql] def ordering = interpretedOrdering
@transient private[sql] lazy val tag = typeTag[InternalType]
@transient
private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new
Ordering[ArrayData] {
private[this] val elementOrdering: Ordering[Any] =
PhysicalDataType(elementType).ordering.asInstanceOf[Ordering[Any]]
def compare(x: ArrayData, y: ArrayData): Int = {
val leftArray = x
val rightArray = y
val minLength = scala.math.min(leftArray.numElements(),
rightArray.numElements())
var i = 0
while (i < minLength) {
val isNullLeft = leftArray.isNullAt(i)
val isNullRight = rightArray.isNullAt(i)
if (isNullLeft && isNullRight) {
// Do nothing.
} else if (isNullLeft) {
return -1
} else if (isNullRight) {
return 1
} else {
val comp =
elementOrdering.compare(
leftArray.get(i, elementType),
rightArray.get(i, elementType))
if (comp != 0) {
return comp
}
}
i += 1
}
if (leftArray.numElements() < rightArray.numElements()) {
-1
} else if (leftArray.numElements() > rightArray.numElements()) {
1
} else {
0
}
}
}
}
case class PhysicalStructType(fields: Array[StructField]) extends
PhysicalDataType {
override private[sql] type InternalType = Any
override private[sql] def ordering =
forSchema(this.fields.map(_.dataType)).asInstanceOf[Ordering[InternalType]]
@transient private[sql] lazy val tag = typeTag[InternalType]
private[sql] def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
new InterpretedOrdering(dataTypes.zipWithIndex.map {
case (dt, index) => SortOrder(BoundReference(index, dt, nullable = true),
Ascending)
})
}
} {code}
was (Author: jackylau):
[~dwysakowicz] after read the
[https://github.com/apache/flink/pull/22730#discussion_r1242187327]
and test in sql client. it throw exception.
{code:java}
Flink SQL> select greatest(array[1, 2],array[2, 3]);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type
expected, but was 'ARRAY<INT NOT NULL> NOT NULL'.
Flink SQL> select greatest(row(1, 2),row(2, 3));
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type
expected, but was 'ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL> NOT NULL'.
{code}
and after dig the code, flink doesn't support complex type compare.
and we can supports like spark latter.
and both the ArrayComparableElementTypeStrategy and ComparableTypeStrategy may
need have a abstract. or ArrayComparableElementTypeStrategy should have a
comments about the remove {{areTypesOfSameRootComparable(firstType,
secondType). }}
{{it will cause lost easily when flink }}{{supports complex type compare
because others don't know it.}}
{{what do you think?}}
{{and if you have time i can support the complex type }}{{{}compare{}}}{{ if
you have time to help review?
{code:java}
case class PhysicalArrayType(
elementType: DataType, containsNull: Boolean) extends PhysicalDataType {
override private[sql] type InternalType = ArrayData
override private[sql] def ordering = interpretedOrdering
@transient private[sql] lazy val tag = typeTag[InternalType]
@transient
private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new
Ordering[ArrayData] {
private[this] val elementOrdering: Ordering[Any] =
PhysicalDataType(elementType).ordering.asInstanceOf[Ordering[Any]]
def compare(x: ArrayData, y: ArrayData): Int = {
val leftArray = x
val rightArray = y
val minLength = scala.math.min(leftArray.numElements(),
rightArray.numElements())
var i = 0
while (i < minLength) {
val isNullLeft = leftArray.isNullAt(i)
val isNullRight = rightArray.isNullAt(i)
if (isNullLeft && isNullRight) {
// Do nothing.
} else if (isNullLeft) {
return -1
} else if (isNullRight) {
return 1
} else {
val comp =
elementOrdering.compare(
leftArray.get(i, elementType),
rightArray.get(i, elementType))
if (comp != 0) {
return comp
}
}
i += 1
}
if (leftArray.numElements() < rightArray.numElements()) {
-1
} else if (leftArray.numElements() > rightArray.numElements()) {
1
} else {
0
}
}
}
}
case class PhysicalStructType(fields: Array[StructField]) extends
PhysicalDataType {
override private[sql] type InternalType = Any
override private[sql] def ordering =
forSchema(this.fields.map(_.dataType)).asInstanceOf[Ordering[InternalType]]
@transient private[sql] lazy val tag = typeTag[InternalType]
private[sql] def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
new InterpretedOrdering(dataTypes.zipWithIndex.map {
case (dt, index) => SortOrder(BoundReference(index, dt, nullable = true),
Ascending)
})
}
} {code}
> Add ARRAY_MAX support in SQL & Table API
> ----------------------------------------
>
> Key: FLINK-32257
> URL: https://issues.apache.org/jira/browse/FLINK-32257
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.18.0
> Reporter: Bonnie Varghese
> Assignee: Hanyu Zheng
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0
>
>
> This is an implementation of ARRAY_MAX
> The array_max() function concatenates get the maximum element from input
> array.
> The result matches the type of the elements. NULL elements are skipped. If
> array is empty, or contains only NULL elements, NULL is returned.
>
> Syntax
> array_max(array)
> Arguments
> array: Any ARRAY with elements for which order is supported.
>
> Returns
> The result matches the type of the elements. NULL elements are skipped. If
> array is empty, or contains only NULL elements, NULL is returned.
>
> Examples
> SQL
>
> > SELECT array_max(array(1, 20, NULL, 3)); 20
>
> {code:java}
> // Fink SQL-> select array_max(array[1, 20, null, 3])
> 20{code}
>
> See also
> spark
> [https://spark.apache.org/docs/latest/api/sql/index.html#array_max|https://spark.apache.org/docs/latest/api/sql/index.html#array_min]
> presto [https://prestodb.io/docs/current/functions/array.html]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)