[ 
https://issues.apache.org/jira/browse/FLINK-32257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17740085#comment-17740085
 ] 

Jacky Lau edited comment on FLINK-32257 at 7/5/23 8:28 AM:
-----------------------------------------------------------

[~dwysakowicz] after read the 
[https://github.com/apache/flink/pull/22730#discussion_r1242187327] 

and test in sql client. it throw exception.

 
{code:java}
Flink SQL> select greatest(array[1, 2],array[2, 3]);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type 
expected, but was 'ARRAY<INT NOT NULL> NOT NULL'.


Flink SQL> select greatest(row(1, 2),row(2, 3));
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type 
expected, but was 'ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL> NOT NULL'. 
{code}
 

and after dig the code, flink doesn't support  complex type compare.

and we can supports like spark latter.

and both the ArrayComparableElementTypeStrategy and ComparableTypeStrategy may 
need have a abstract. or ArrayComparableElementTypeStrategy should have a 
comments about the remove \{{areTypesOfSameRootComparable(firstType, 
secondType). }}

{{it will cause lost easily when flink }}{{supports complex type compare 
because others don't know it.}}

{{what do you think?}}

{{and if you have time i can support the complex  type }}{{compare }} if you 
have time to help review?

 
{code:java}
case class PhysicalArrayType(
    elementType: DataType, containsNull: Boolean) extends PhysicalDataType {
  override private[sql] type InternalType = ArrayData
  override private[sql] def ordering = interpretedOrdering
  @transient private[sql] lazy val tag = typeTag[InternalType]

  @transient
  private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new 
Ordering[ArrayData] {
    private[this] val elementOrdering: Ordering[Any] =
      PhysicalDataType(elementType).ordering.asInstanceOf[Ordering[Any]]

    def compare(x: ArrayData, y: ArrayData): Int = {
      val leftArray = x
      val rightArray = y
      val minLength = scala.math.min(leftArray.numElements(), 
rightArray.numElements())
      var i = 0
      while (i < minLength) {
        val isNullLeft = leftArray.isNullAt(i)
        val isNullRight = rightArray.isNullAt(i)
        if (isNullLeft && isNullRight) {
          // Do nothing.
        } else if (isNullLeft) {
          return -1
        } else if (isNullRight) {
          return 1
        } else {
          val comp =
            elementOrdering.compare(
              leftArray.get(i, elementType),
              rightArray.get(i, elementType))
          if (comp != 0) {
            return comp
          }
        }
        i += 1
      }
      if (leftArray.numElements() < rightArray.numElements()) {
        -1
      } else if (leftArray.numElements() > rightArray.numElements()) {
        1
      } else {
        0
      }
    }
  }
}

case class PhysicalStructType(fields: Array[StructField]) extends 
PhysicalDataType {
  override private[sql] type InternalType = Any
  override private[sql] def ordering =
    forSchema(this.fields.map(_.dataType)).asInstanceOf[Ordering[InternalType]]
  @transient private[sql] lazy val tag = typeTag[InternalType]

  private[sql] def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
    new InterpretedOrdering(dataTypes.zipWithIndex.map {
      case (dt, index) => SortOrder(BoundReference(index, dt, nullable = true), 
Ascending)
    })
  }
} {code}
 


was (Author: jackylau):
[~dwysakowicz] after read the 
[https://github.com/apache/flink/pull/22730#discussion_r1242187327] 

and test in sql client. it throw exception.

 
{code:java}
Flink SQL> select greatest(array[1, 2],array[2, 3]);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type 
expected, but was 'ARRAY<INT NOT NULL> NOT NULL'.


Flink SQL> select greatest(row(1, 2),row(2, 3));
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Comparable type 
expected, but was 'ROW<`EXPR$0` INT NOT NULL, `EXPR$1` INT NOT NULL> NOT NULL'. 
{code}
 

and after dig the code, flink doesn't support  complex type compare.

and we can supports like spark latter.

and both the ArrayComparableElementTypeStrategy and ComparableTypeStrategy may 
need have a abstract. or ArrayComparableElementTypeStrategy should have a 
comments about the remove {{areTypesOfSameRootComparable(firstType, 
secondType). }}

{{it will cause lost easily when flink }}{{supports complex type compare 
because others don't know it.}}

{{what do you think?}}

{{and if you have time i can support the complex  type }}{{{}compare{}}}{{ if 
you have time to help review?

 
{code:java}
case class PhysicalArrayType(
    elementType: DataType, containsNull: Boolean) extends PhysicalDataType {
  override private[sql] type InternalType = ArrayData
  override private[sql] def ordering = interpretedOrdering
  @transient private[sql] lazy val tag = typeTag[InternalType]

  @transient
  private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new 
Ordering[ArrayData] {
    private[this] val elementOrdering: Ordering[Any] =
      PhysicalDataType(elementType).ordering.asInstanceOf[Ordering[Any]]

    def compare(x: ArrayData, y: ArrayData): Int = {
      val leftArray = x
      val rightArray = y
      val minLength = scala.math.min(leftArray.numElements(), 
rightArray.numElements())
      var i = 0
      while (i < minLength) {
        val isNullLeft = leftArray.isNullAt(i)
        val isNullRight = rightArray.isNullAt(i)
        if (isNullLeft && isNullRight) {
          // Do nothing.
        } else if (isNullLeft) {
          return -1
        } else if (isNullRight) {
          return 1
        } else {
          val comp =
            elementOrdering.compare(
              leftArray.get(i, elementType),
              rightArray.get(i, elementType))
          if (comp != 0) {
            return comp
          }
        }
        i += 1
      }
      if (leftArray.numElements() < rightArray.numElements()) {
        -1
      } else if (leftArray.numElements() > rightArray.numElements()) {
        1
      } else {
        0
      }
    }
  }
}

case class PhysicalStructType(fields: Array[StructField]) extends 
PhysicalDataType {
  override private[sql] type InternalType = Any
  override private[sql] def ordering =
    forSchema(this.fields.map(_.dataType)).asInstanceOf[Ordering[InternalType]]
  @transient private[sql] lazy val tag = typeTag[InternalType]

  private[sql] def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
    new InterpretedOrdering(dataTypes.zipWithIndex.map {
      case (dt, index) => SortOrder(BoundReference(index, dt, nullable = true), 
Ascending)
    })
  }
} {code}
 

> Add ARRAY_MAX support in SQL & Table API
> ----------------------------------------
>
>                 Key: FLINK-32257
>                 URL: https://issues.apache.org/jira/browse/FLINK-32257
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Bonnie Varghese
>            Assignee: Hanyu Zheng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.18.0
>
>
> This is an implementation of ARRAY_MAX
> The array_max() function concatenates get the maximum element from input 
> array.
> The result matches the type of the elements. NULL elements are skipped. If 
> array is empty, or contains only NULL elements, NULL is returned.
>  
> Syntax
> array_max(array)
> Arguments
> array: Any ARRAY with elements for which order is supported.
>  
> Returns
> The result matches the type of the elements. NULL elements are skipped. If 
> array is empty, or contains only NULL elements, NULL is returned.
>  
> Examples
> SQL
>  
> > SELECT array_max(array(1, 20, NULL, 3)); 20
>  
> {code:java}
> // Fink SQL-> select array_max(array[1, 20, null, 3])
> 20{code}
>  
> See also
> spark 
> [https://spark.apache.org/docs/latest/api/sql/index.html#array_max|https://spark.apache.org/docs/latest/api/sql/index.html#array_min]
> presto [https://prestodb.io/docs/current/functions/array.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to