cryptoe commented on code in PR #16322:
URL: https://github.com/apache/druid/pull/16322#discussion_r1592070249


##########
processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class RowKeyComparisonRunLengths
+{
+
+  public static RowKeyComparisonRunLengths create(final List<KeyColumn> 
keyColumns, RowSignature rowSignature)
+  {
+    final List<RunLengthEntry> runLengthEntries = new ArrayList<>();
+    for (KeyColumn keyColumn : keyColumns) {
+
+      if (keyColumn.order() == KeyOrder.NONE) {
+        throw DruidException.defensive(
+            "Cannot sort on column [%s] when the sorting order isn't provided",
+            keyColumn.columnName()
+        );
+      }
+
+      ColumnType columnType = 
rowSignature.getColumnType(keyColumn.columnName())
+                                          .orElseThrow(() -> 
DruidException.defensive("Need column types"));
+
+      if (runLengthEntries.size() == 0) {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isByteComparable(columnType),
+                1,
+                keyColumn.order()
+            )
+        );
+        continue;
+      }
+
+      // There is atleast one RunLengthEntry present in the array. Check if we 
can find a way to merge the current entry
+      // with the previous one
+      boolean isCurrentColumnByteComparable = isByteComparable(columnType);
+      RunLengthEntry lastRunLengthEntry = 
runLengthEntries.get(runLengthEntries.size() - 1);
+      if (lastRunLengthEntry.isByteComparable()
+          && isCurrentColumnByteComparable
+          && lastRunLengthEntry.order.equals(keyColumn.order())
+      ) {
+        lastRunLengthEntry.runLength++;
+      } else {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isCurrentColumnByteComparable,
+                1,
+                keyColumn.order()
+            )
+        );
+      }
+    }
+    return new RowKeyComparisonRunLengths(runLengthEntries);
+  }
+
+  public static boolean isByteComparable(@Nullable ColumnType columnType)
+  {
+    // For backward compatibility
+    // Only types which were byte comparable were allowed. Therefore, if we 
don't know the columnType, we should assume
+    // that it is byte comparable
+    if (columnType == null) {
+      return true;
+    } else if (columnType.is(ValueType.COMPLEX)) {
+      if (columnType.getComplexTypeName() == null) {
+        throw DruidException.defensive("Cannot sort unknown complex types");
+      }
+      // Complex types with known types are not byte comparable and must be 
deserialized for comparison
+      return false;
+    } else if (columnType.isArray() && !columnType.isPrimitiveArray()) {
+      // Nested arrays aren't allowed directly in the frames - they are 
materialized as nested types.
+      // Nested arrays aren't byte comparable, if they find a way to creep in.
+      throw DruidException.defensive("Nested arrays aren't supported in row 
based frames");
+    }
+    return true;
+  }
+
+  private final List<RunLengthEntry> runLengthEntries;
+
+  public RowKeyComparisonRunLengths(List<RunLengthEntry> runLengthEntries)

Review Comment:
   The constructor should be private. 



##########
processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class RowKeyComparisonRunLengths
+{
+
+  public static RowKeyComparisonRunLengths create(final List<KeyColumn> 
keyColumns, RowSignature rowSignature)
+  {
+    final List<RunLengthEntry> runLengthEntries = new ArrayList<>();
+    for (KeyColumn keyColumn : keyColumns) {
+
+      if (keyColumn.order() == KeyOrder.NONE) {
+        throw DruidException.defensive(
+            "Cannot sort on column [%s] when the sorting order isn't provided",
+            keyColumn.columnName()
+        );
+      }
+
+      ColumnType columnType = 
rowSignature.getColumnType(keyColumn.columnName())
+                                          .orElseThrow(() -> 
DruidException.defensive("Need column types"));
+
+      if (runLengthEntries.size() == 0) {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isByteComparable(columnType),
+                1,
+                keyColumn.order()
+            )
+        );
+        continue;
+      }
+
+      // There is atleast one RunLengthEntry present in the array. Check if we 
can find a way to merge the current entry
+      // with the previous one
+      boolean isCurrentColumnByteComparable = isByteComparable(columnType);
+      RunLengthEntry lastRunLengthEntry = 
runLengthEntries.get(runLengthEntries.size() - 1);
+      if (lastRunLengthEntry.isByteComparable()
+          && isCurrentColumnByteComparable
+          && lastRunLengthEntry.order.equals(keyColumn.order())
+      ) {
+        lastRunLengthEntry.runLength++;
+      } else {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isCurrentColumnByteComparable,
+                1,
+                keyColumn.order()
+            )
+        );
+      }
+    }
+    return new RowKeyComparisonRunLengths(runLengthEntries);
+  }
+
+  public static boolean isByteComparable(@Nullable ColumnType columnType)
+  {
+    // For backward compatibility
+    // Only types which were byte comparable were allowed. Therefore, if we 
don't know the columnType, we should assume
+    // that it is byte comparable
+    if (columnType == null) {
+      return true;
+    } else if (columnType.is(ValueType.COMPLEX)) {
+      if (columnType.getComplexTypeName() == null) {
+        throw DruidException.defensive("Cannot sort unknown complex types");
+      }
+      // Complex types with known types are not byte comparable and must be 
deserialized for comparison
+      return false;

Review Comment:
   I think we should add a public method to typeStrategy#isByteComparable() and 
check that here as well. 



##########
processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class RowKeyComparisonRunLengths
+{
+
+  public static RowKeyComparisonRunLengths create(final List<KeyColumn> 
keyColumns, RowSignature rowSignature)
+  {
+    final List<RunLengthEntry> runLengthEntries = new ArrayList<>();
+    for (KeyColumn keyColumn : keyColumns) {
+
+      if (keyColumn.order() == KeyOrder.NONE) {
+        throw DruidException.defensive(
+            "Cannot sort on column [%s] when the sorting order isn't provided",
+            keyColumn.columnName()
+        );
+      }
+
+      ColumnType columnType = 
rowSignature.getColumnType(keyColumn.columnName())
+                                          .orElseThrow(() -> 
DruidException.defensive("Need column types"));
+
+      if (runLengthEntries.size() == 0) {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isByteComparable(columnType),
+                1,
+                keyColumn.order()
+            )
+        );
+        continue;
+      }
+
+      // There is atleast one RunLengthEntry present in the array. Check if we 
can find a way to merge the current entry
+      // with the previous one
+      boolean isCurrentColumnByteComparable = isByteComparable(columnType);
+      RunLengthEntry lastRunLengthEntry = 
runLengthEntries.get(runLengthEntries.size() - 1);
+      if (lastRunLengthEntry.isByteComparable()
+          && isCurrentColumnByteComparable
+          && lastRunLengthEntry.order.equals(keyColumn.order())
+      ) {
+        lastRunLengthEntry.runLength++;
+      } else {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isCurrentColumnByteComparable,
+                1,
+                keyColumn.order()
+            )
+        );
+      }
+    }
+    return new RowKeyComparisonRunLengths(runLengthEntries);
+  }
+
+  public static boolean isByteComparable(@Nullable ColumnType columnType)
+  {
+    // For backward compatibility
+    // Only types which were byte comparable were allowed. Therefore, if we 
don't know the columnType, we should assume
+    // that it is byte comparable
+    if (columnType == null) {
+      return true;
+    } else if (columnType.is(ValueType.COMPLEX)) {
+      if (columnType.getComplexTypeName() == null) {
+        throw DruidException.defensive("Cannot sort unknown complex types");
+      }
+      // Complex types with known types are not byte comparable and must be 
deserialized for comparison
+      return false;

Review Comment:
   That way we donot need to change this code if we add byte comparable 
versions of complex types. 



##########
processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class RowKeyComparisonRunLengths
+{
+
+  public static RowKeyComparisonRunLengths create(final List<KeyColumn> 
keyColumns, RowSignature rowSignature)
+  {
+    final List<RunLengthEntry> runLengthEntries = new ArrayList<>();
+    for (KeyColumn keyColumn : keyColumns) {
+
+      if (keyColumn.order() == KeyOrder.NONE) {
+        throw DruidException.defensive(
+            "Cannot sort on column [%s] when the sorting order isn't provided",
+            keyColumn.columnName()
+        );
+      }
+
+      ColumnType columnType = 
rowSignature.getColumnType(keyColumn.columnName())
+                                          .orElseThrow(() -> 
DruidException.defensive("Need column types"));
+
+      if (runLengthEntries.size() == 0) {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isByteComparable(columnType),
+                1,
+                keyColumn.order()
+            )
+        );
+        continue;
+      }
+
+      // There is atleast one RunLengthEntry present in the array. Check if we 
can find a way to merge the current entry
+      // with the previous one
+      boolean isCurrentColumnByteComparable = isByteComparable(columnType);
+      RunLengthEntry lastRunLengthEntry = 
runLengthEntries.get(runLengthEntries.size() - 1);
+      if (lastRunLengthEntry.isByteComparable()
+          && isCurrentColumnByteComparable
+          && lastRunLengthEntry.order.equals(keyColumn.order())
+      ) {
+        lastRunLengthEntry.runLength++;
+      } else {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isCurrentColumnByteComparable,
+                1,
+                keyColumn.order()
+            )
+        );
+      }
+    }
+    return new RowKeyComparisonRunLengths(runLengthEntries);
+  }
+
+  public static boolean isByteComparable(@Nullable ColumnType columnType)
+  {
+    // For backward compatibility
+    // Only types which were byte comparable were allowed. Therefore, if we 
don't know the columnType, we should assume
+    // that it is byte comparable
+    if (columnType == null) {
+      return true;
+    } else if (columnType.is(ValueType.COMPLEX)) {
+      if (columnType.getComplexTypeName() == null) {
+        throw DruidException.defensive("Cannot sort unknown complex types");
+      }
+      // Complex types with known types are not byte comparable and must be 
deserialized for comparison
+      return false;
+    } else if (columnType.isArray() && !columnType.isPrimitiveArray()) {
+      // Nested arrays aren't allowed directly in the frames - they are 
materialized as nested types.
+      // Nested arrays aren't byte comparable, if they find a way to creep in.
+      throw DruidException.defensive("Nested arrays aren't supported in row 
based frames");
+    }
+    return true;
+  }
+
+  private final List<RunLengthEntry> runLengthEntries;
+
+  public RowKeyComparisonRunLengths(List<RunLengthEntry> runLengthEntries)
+  {
+    this.runLengthEntries = runLengthEntries;
+  }
+
+  public List<RunLengthEntry> getRunLengthEntries()
+  {
+    return runLengthEntries;
+  }
+
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    RowKeyComparisonRunLengths that = (RowKeyComparisonRunLengths) o;
+    return Objects.equals(runLengthEntries, that.runLengthEntries);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(runLengthEntries);
+  }
+
+  @Override
+  public String toString()
+  {
+    return runLengthEntries.toString();
+  }
+
+  /**
+   * Information about a continguous run of keys, that has the same sorting 
order
+   */
+  public static class RunLengthEntry
+  {
+    private final boolean byteComparable;
+    private int runLength;

Review Comment:
   This should not be part of the constructor. 
   We should only have a increseRunLenght() method 



##########
processing/src/main/java/org/apache/druid/frame/key/RowKeyComparisonRunLengths.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class RowKeyComparisonRunLengths
+{
+
+  public static RowKeyComparisonRunLengths create(final List<KeyColumn> 
keyColumns, RowSignature rowSignature)
+  {
+    final List<RunLengthEntry> runLengthEntries = new ArrayList<>();
+    for (KeyColumn keyColumn : keyColumns) {
+
+      if (keyColumn.order() == KeyOrder.NONE) {
+        throw DruidException.defensive(
+            "Cannot sort on column [%s] when the sorting order isn't provided",
+            keyColumn.columnName()
+        );
+      }
+
+      ColumnType columnType = 
rowSignature.getColumnType(keyColumn.columnName())
+                                          .orElseThrow(() -> 
DruidException.defensive("Need column types"));
+
+      if (runLengthEntries.size() == 0) {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isByteComparable(columnType),
+                1,
+                keyColumn.order()
+            )
+        );
+        continue;
+      }
+
+      // There is atleast one RunLengthEntry present in the array. Check if we 
can find a way to merge the current entry
+      // with the previous one
+      boolean isCurrentColumnByteComparable = isByteComparable(columnType);
+      RunLengthEntry lastRunLengthEntry = 
runLengthEntries.get(runLengthEntries.size() - 1);
+      if (lastRunLengthEntry.isByteComparable()
+          && isCurrentColumnByteComparable
+          && lastRunLengthEntry.order.equals(keyColumn.order())
+      ) {
+        lastRunLengthEntry.runLength++;
+      } else {
+        runLengthEntries.add(
+            new RunLengthEntry(
+                isCurrentColumnByteComparable,
+                1,
+                keyColumn.order()
+            )
+        );
+      }
+    }
+    return new RowKeyComparisonRunLengths(runLengthEntries);
+  }
+
+  public static boolean isByteComparable(@Nullable ColumnType columnType)
+  {
+    // For backward compatibility
+    // Only types which were byte comparable were allowed. Therefore, if we 
don't know the columnType, we should assume
+    // that it is byte comparable
+    if (columnType == null) {
+      return true;

Review Comment:
   So I think column types can't be null rite because in the rowKeyComparators, 
you have the check. 



##########
processing/src/main/java/org/apache/druid/frame/key/ByteRowKeyComparator.java:
##########
@@ -68,83 +90,86 @@ public static int computeFirstFieldPosition(final int 
fieldCount)
     return Ints.checkedCast((long) fieldCount * Integer.BYTES);
   }
 
-  /**
-   * Given a list of sort columns, compute an array of the number of ascending 
fields in a run, followed by number of
-   * descending fields in a run, followed by ascending, etc. For example: ASC, 
ASC, DESC, ASC would return [2, 1, 1]
-   * and DESC, DESC, ASC would return [0, 2, 1].
-   *
-   * Public so {@link FrameComparisonWidgetImpl} can use it.
-   */
-  public static int[] computeAscDescRunLengths(final List<KeyColumn> 
keyColumns)
+  @Override
+  @SuppressWarnings("SubtractionInCompareTo")
+  public int compare(final byte[] keyArray1, final byte[] keyArray2)
   {
-    final IntList ascDescRunLengths = new IntArrayList(4);
+    // Similar logic to FrameComparisonWidgetImpl, but implementation is 
different enough that we need our own.
+    // Major difference is Frame v. Frame instead of byte[] v. byte[].
 
-    KeyOrder order = KeyOrder.ASCENDING;
-    int runLength = 0;
+    int currentRunStartPosition1 = firstFieldPosition;
+    int currentRunStartPosition2 = firstFieldPosition;
 
-    for (final KeyColumn column : keyColumns) {
-      if (column.order() == KeyOrder.NONE) {
-        throw new IAE("Key must be sortable");
-      }
+    // Number of fields compared till now, which is equivalent to the index of 
the field to compare next
+    int fieldsComparedTillNow = 0;
 
-      if (column.order() != order) {
-        ascDescRunLengths.add(runLength);
-        runLength = 0;
+    for (RowKeyComparisonRunLengths.RunLengthEntry runLengthEntry : 
rowKeyComparisonRunLengths.getRunLengthEntries()) {
 
-        // Invert "order".
-        order = order == KeyOrder.ASCENDING ? KeyOrder.DESCENDING : 
KeyOrder.ASCENDING;
+      if (runLengthEntry.getRunLength() <= 0) {
+        // Defensive check
+        continue;
       }
 
-      runLength++;
-    }
-
-    if (runLength > 0) {
-      ascDescRunLengths.add(runLength);
-    }
-
-    return ascDescRunLengths.toIntArray();
-  }
-
-  @Override
-  @SuppressWarnings("SubtractionInCompareTo")
-  public int compare(final byte[] keyArray1, final byte[] keyArray2)
-  {
-    // Similar logic to FrameComparaisonWidgetImpl, but implementation is 
different enough that we need our own.
-    // Major difference is Frame v. Frame instead of byte[] v. byte[].
-
-    int comparableBytesStartPosition1 = firstFieldPosition;
-    int comparableBytesStartPosition2 = firstFieldPosition;
+      if (!runLengthEntry.isByteComparable()) {
+        // Only complex types are not byte comparable. Nested arrays aren't 
supported in MSQ
+        assert runLengthEntry.getRunLength() == 1;
+        // 'fieldsComparedTillNow' is the index of the current keyColumn in 
the keyColumns list. Sanity check that its
+        // a known complex type
+        ColumnType columnType = 
rowSignature.getColumnType(keyColumns.get(fieldsComparedTillNow).columnName())
+                                            .orElseThrow(() -> 
DruidException.defensive("Expecting a complex column with known type"));
+        String complexTypeName = Preconditions.checkNotNull(
+            columnType.getComplexTypeName(),
+            "complexType must be present for comparison"
+        );
 
-    boolean ascending = true;
-    int field = 0;
+        ComplexMetricSerde serde = Preconditions.checkNotNull(
+            ComplexMetrics.getSerdeForType(complexTypeName),
+            "serde for type [%s] not present",
+            complexTypeName
+        );
 
-    for (int numFields : ascDescRunLengths) {
-      if (numFields > 0) {
-        final int nextField = field + numFields;
-        final int comparableBytesEndPosition1 = 
RowKeyReader.fieldEndPosition(keyArray1, nextField - 1);
-        final int comparableBytesEndPosition2 = 
RowKeyReader.fieldEndPosition(keyArray2, nextField - 1);
+        // Index of the next field that will get considered. Excludes the 
current field that we are comparing right now
+        final int nextField = fieldsComparedTillNow + 1;

Review Comment:
   runLenghtEntry.getRunLen() should be used here. 
   I think we can take it out of the if condition. 



##########
processing/src/main/java/org/apache/druid/frame/key/ByteRowKeyComparator.java:
##########
@@ -68,83 +90,86 @@ public static int computeFirstFieldPosition(final int 
fieldCount)
     return Ints.checkedCast((long) fieldCount * Integer.BYTES);
   }
 
-  /**
-   * Given a list of sort columns, compute an array of the number of ascending 
fields in a run, followed by number of
-   * descending fields in a run, followed by ascending, etc. For example: ASC, 
ASC, DESC, ASC would return [2, 1, 1]
-   * and DESC, DESC, ASC would return [0, 2, 1].
-   *
-   * Public so {@link FrameComparisonWidgetImpl} can use it.
-   */
-  public static int[] computeAscDescRunLengths(final List<KeyColumn> 
keyColumns)
+  @Override
+  @SuppressWarnings("SubtractionInCompareTo")
+  public int compare(final byte[] keyArray1, final byte[] keyArray2)
   {
-    final IntList ascDescRunLengths = new IntArrayList(4);
+    // Similar logic to FrameComparisonWidgetImpl, but implementation is 
different enough that we need our own.
+    // Major difference is Frame v. Frame instead of byte[] v. byte[].
 
-    KeyOrder order = KeyOrder.ASCENDING;
-    int runLength = 0;
+    int currentRunStartPosition1 = firstFieldPosition;
+    int currentRunStartPosition2 = firstFieldPosition;
 
-    for (final KeyColumn column : keyColumns) {
-      if (column.order() == KeyOrder.NONE) {
-        throw new IAE("Key must be sortable");
-      }
+    // Number of fields compared till now, which is equivalent to the index of 
the field to compare next
+    int fieldsComparedTillNow = 0;
 
-      if (column.order() != order) {
-        ascDescRunLengths.add(runLength);
-        runLength = 0;
+    for (RowKeyComparisonRunLengths.RunLengthEntry runLengthEntry : 
rowKeyComparisonRunLengths.getRunLengthEntries()) {
 
-        // Invert "order".
-        order = order == KeyOrder.ASCENDING ? KeyOrder.DESCENDING : 
KeyOrder.ASCENDING;
+      if (runLengthEntry.getRunLength() <= 0) {
+        // Defensive check
+        continue;
       }
 
-      runLength++;
-    }
-
-    if (runLength > 0) {
-      ascDescRunLengths.add(runLength);
-    }
-
-    return ascDescRunLengths.toIntArray();
-  }
-
-  @Override
-  @SuppressWarnings("SubtractionInCompareTo")
-  public int compare(final byte[] keyArray1, final byte[] keyArray2)
-  {
-    // Similar logic to FrameComparaisonWidgetImpl, but implementation is 
different enough that we need our own.
-    // Major difference is Frame v. Frame instead of byte[] v. byte[].
-
-    int comparableBytesStartPosition1 = firstFieldPosition;
-    int comparableBytesStartPosition2 = firstFieldPosition;
+      if (!runLengthEntry.isByteComparable()) {
+        // Only complex types are not byte comparable. Nested arrays aren't 
supported in MSQ
+        assert runLengthEntry.getRunLength() == 1;
+        // 'fieldsComparedTillNow' is the index of the current keyColumn in 
the keyColumns list. Sanity check that its
+        // a known complex type
+        ColumnType columnType = 
rowSignature.getColumnType(keyColumns.get(fieldsComparedTillNow).columnName())
+                                            .orElseThrow(() -> 
DruidException.defensive("Expecting a complex column with known type"));
+        String complexTypeName = Preconditions.checkNotNull(
+            columnType.getComplexTypeName(),
+            "complexType must be present for comparison"
+        );
 
-    boolean ascending = true;
-    int field = 0;
+        ComplexMetricSerde serde = Preconditions.checkNotNull(
+            ComplexMetrics.getSerdeForType(complexTypeName),
+            "serde for type [%s] not present",
+            complexTypeName
+        );
 
-    for (int numFields : ascDescRunLengths) {
-      if (numFields > 0) {
-        final int nextField = field + numFields;
-        final int comparableBytesEndPosition1 = 
RowKeyReader.fieldEndPosition(keyArray1, nextField - 1);
-        final int comparableBytesEndPosition2 = 
RowKeyReader.fieldEndPosition(keyArray2, nextField - 1);
+        // Index of the next field that will get considered. Excludes the 
current field that we are comparing right now
+        final int nextField = fieldsComparedTillNow + 1;
+        final int currentRunEndPosition1 = 
RowKeyReader.fieldEndPosition(keyArray1, nextField - 1);
+        final int currentRunEndPosition2 = 
RowKeyReader.fieldEndPosition(keyArray2, nextField - 1);
 
+        int cmp = FrameReaderUtils.compareComplexTypes(
+            keyArray1,
+            currentRunStartPosition1,
+            keyArray2,
+            currentRunStartPosition2,
+            columnType,
+            serde
+        );
+        if (cmp != 0) {
+          return runLengthEntry.getOrder() == KeyOrder.ASCENDING ? cmp : -cmp;
+        }
+        // We have only compared a single field here
+        fieldsComparedTillNow = nextField;

Review Comment:
   lets have 148-150 out side the if, else 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to