abhishekagarwal87 commented on code in PR #15241:
URL: https://github.com/apache/druid/pull/15241#discussion_r1374283108


##########
processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java:
##########
@@ -33,11 +34,11 @@
 public class NaiveSortOperator implements Operator
 {
   private final Operator child;
-  private final ArrayList<ColumnWithDirection> sortColumns;
+  private final List<ColumnWithDirection> sortColumns;

Review Comment:
   whats this change for? 



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java:
##########
@@ -238,7 +239,8 @@ private Pair<byte[], RowSignature> 
materializeStorageAdapter(StorageAdapter as)
         throw new ISE("accumulated[%s] non-null, why did we get multiple 
cursors?", accumulated);
       }
 
-      int theLimit = limit == -1 ? Integer.MAX_VALUE : limit;
+      long offsetRemaining = limit.getOffset();

Review Comment:
   nit - variable name is confusing. offset is fixed :) 



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -1473,13 +1481,78 @@ private WindowOperatorQuery toWindowQuery()
     );
   }
 
+  /**
+   * Create an OperatorQuery which runs an order on top of a scan.
+   */
+  @Nullable
+  private WindowOperatorQuery toScanAndSortQuery()
+  {
+    if (sorting == null
+        || sorting.getOrderBys().isEmpty()
+        || sorting.getProjection() != null) {
+      return null;
+    }
+
+    ScanQuery scan = toScanQuery(false);
+    if (scan == null) {
+      return null;
+    }
+
+    // Reject cases which would sort the datasource directly
+    if (dataSource != DruidOuterQueryRel.DUMMY_DATA_SOURCE && 
dataSource.isConcrete()) {
+      List<String> orderByColumnNames = sorting.getOrderBys()
+          .stream().map(OrderByColumnSpec::getDimension)
+          .collect(Collectors.toList());
+      plannerContext.setPlanningError(
+          "SQL query requires order by non-time column [%s], which is not 
supported.",

Review Comment:
   ```suggestion
             "SQL query requires ordering a table by non-time column [%s], 
which is not supported.",
   ```



##########
processing/src/main/java/org/apache/druid/query/operator/OffsetLimit.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import java.util.Objects;
+
+public class OffsetLimit
+{
+  protected final long offset;
+  protected final long limit;
+
+  public static final OffsetLimit NONE = new OffsetLimit(0, -1);
+
+  @JsonCreator
+  public OffsetLimit(
+      @JsonProperty("offset") long offset,
+      @JsonProperty("limit") long limit)
+  {
+    Preconditions.checkArgument(offset >= 0, "offset >= 0");
+    this.offset = offset;
+    this.limit = limit < 0 ? -1 : limit;
+  }
+
+  @JsonProperty("offset")
+  public long getOffset()
+  {
+    return offset;
+  }
+
+  @JsonProperty("limit")
+  public long getLimit()
+  {
+    return limit;
+  }
+
+  public boolean isPresent()
+  {
+    return hasOffset() || hasLimit();
+  }
+
+  public boolean hasOffset()
+  {
+    return offset > 0;
+  }
+
+  public boolean hasLimit()
+  {
+    return limit >= 0;
+  }
+
+  public static OffsetLimit limit(int limit2)
+  {
+    return new OffsetLimit(0, limit2);
+  }
+
+  public long getLimitOrMax()
+  {
+    if (limit < 0) {
+      return Long.MAX_VALUE;
+    } else {
+      return limit;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof OffsetLimit)) {
+      return false;
+    }
+    OffsetLimit that = (OffsetLimit) o;
+    return limit == that.limit && offset == that.offset;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(limit, offset);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "OffsetLimit{" +
+        "offset=" + offset +
+        ", limit=" + limit +
+        '}';
+  }
+
+  public long getFetchFromIndex(long maxIndex)
+  {
+    if (maxIndex <= offset) {
+      return 0;
+    }
+    return offset;
+  }
+
+  public long getFetchToIndex(long maxIndex)
+  {
+    if (maxIndex <= offset) {
+      return 0;
+    }
+    if (hasLimit()) {
+      long toIndex = limit + offset;
+      if (limit > Long.MAX_VALUE - offset) {
+        throw DruidException.defensive(
+            "Cannot compute toIndex due to overflow [%s]",
+            this);

Review Comment:
   Shouldn't this be checked in the constructor itself? Pretty wild if we hit 
this one. 😄 



##########
processing/src/main/java/org/apache/druid/query/operator/OffsetLimit.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import java.util.Objects;
+
+public class OffsetLimit
+{
+  protected final long offset;
+  protected final long limit;
+
+  public static final OffsetLimit NONE = new OffsetLimit(0, -1);
+
+  @JsonCreator
+  public OffsetLimit(
+      @JsonProperty("offset") long offset,
+      @JsonProperty("limit") long limit)
+  {
+    Preconditions.checkArgument(offset >= 0, "offset >= 0");
+    this.offset = offset;
+    this.limit = limit < 0 ? -1 : limit;
+  }
+
+  @JsonProperty("offset")
+  public long getOffset()
+  {
+    return offset;
+  }
+
+  @JsonProperty("limit")
+  public long getLimit()
+  {
+    return limit;
+  }
+
+  public boolean isPresent()
+  {
+    return hasOffset() || hasLimit();
+  }
+
+  public boolean hasOffset()
+  {
+    return offset > 0;
+  }
+
+  public boolean hasLimit()
+  {
+    return limit >= 0;
+  }
+
+  public static OffsetLimit limit(int limit2)
+  {
+    return new OffsetLimit(0, limit2);
+  }
+
+  public long getLimitOrMax()
+  {
+    if (limit < 0) {
+      return Long.MAX_VALUE;
+    } else {
+      return limit;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof OffsetLimit)) {
+      return false;
+    }
+    OffsetLimit that = (OffsetLimit) o;
+    return limit == that.limit && offset == that.offset;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(limit, offset);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "OffsetLimit{" +
+        "offset=" + offset +
+        ", limit=" + limit +
+        '}';
+  }
+
+  public long getFetchFromIndex(long maxIndex)
+  {
+    if (maxIndex <= offset) {
+      return 0;
+    }
+    return offset;
+  }
+
+  public long getFetchToIndex(long maxIndex)
+  {
+    if (maxIndex <= offset) {
+      return 0;
+    }
+    if (hasLimit()) {
+      long toIndex = limit + offset;
+      if (limit > Long.MAX_VALUE - offset) {
+        throw DruidException.defensive(
+            "Cannot compute toIndex due to overflow [%s]",
+            this);

Review Comment:
   Shouldn't this be checked in the constructor itself? Pretty wild if we hit 
this one. 😄 



##########
processing/src/main/java/org/apache/druid/query/operator/OffsetLimit.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
+import java.util.Objects;
+
+public class OffsetLimit
+{
+  protected final long offset;
+  protected final long limit;
+
+  public static final OffsetLimit NONE = new OffsetLimit(0, -1);
+
+  @JsonCreator
+  public OffsetLimit(
+      @JsonProperty("offset") long offset,
+      @JsonProperty("limit") long limit)
+  {
+    Preconditions.checkArgument(offset >= 0, "offset >= 0");
+    this.offset = offset;
+    this.limit = limit < 0 ? -1 : limit;
+  }
+
+  @JsonProperty("offset")
+  public long getOffset()
+  {
+    return offset;
+  }
+
+  @JsonProperty("limit")
+  public long getLimit()
+  {
+    return limit;
+  }
+
+  public boolean isPresent()
+  {
+    return hasOffset() || hasLimit();
+  }
+
+  public boolean hasOffset()
+  {
+    return offset > 0;
+  }
+
+  public boolean hasLimit()
+  {
+    return limit >= 0;
+  }
+
+  public static OffsetLimit limit(int limit2)
+  {
+    return new OffsetLimit(0, limit2);
+  }
+
+  public long getLimitOrMax()
+  {
+    if (limit < 0) {
+      return Long.MAX_VALUE;
+    } else {
+      return limit;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof OffsetLimit)) {
+      return false;
+    }
+    OffsetLimit that = (OffsetLimit) o;
+    return limit == that.limit && offset == that.offset;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(limit, offset);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "OffsetLimit{" +
+        "offset=" + offset +
+        ", limit=" + limit +
+        '}';
+  }
+
+  public long getFetchFromIndex(long maxIndex)

Review Comment:
   can you please add some javadocs? 



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -1473,13 +1481,78 @@ private WindowOperatorQuery toWindowQuery()
     );
   }
 
+  /**
+   * Create an OperatorQuery which runs an order on top of a scan.
+   */
+  @Nullable
+  private WindowOperatorQuery toScanAndSortQuery()
+  {
+    if (sorting == null
+        || sorting.getOrderBys().isEmpty()
+        || sorting.getProjection() != null) {
+      return null;
+    }
+
+    ScanQuery scan = toScanQuery(false);
+    if (scan == null) {
+      return null;
+    }
+
+    // Reject cases which would sort the datasource directly
+    if (dataSource != DruidOuterQueryRel.DUMMY_DATA_SOURCE && 
dataSource.isConcrete()) {
+      List<String> orderByColumnNames = sorting.getOrderBys()
+          .stream().map(OrderByColumnSpec::getDimension)
+          .collect(Collectors.toList());
+      plannerContext.setPlanningError(
+          "SQL query requires order by non-time column [%s], which is not 
supported.",
+          orderByColumnNames);
+      return null;
+    }
+
+    QueryDataSource newDataSource = new QueryDataSource(scan);
+    ArrayList<ColumnWithDirection> sortColumns = 
getColumnWithDriectionsFromOrderBys(sorting.getOrderBys());
+    RowSignature signature = getOutputRowSignature();
+    List<OperatorFactory> operators = new ArrayList<>();
+
+    operators.add(new NaiveSortOperatorFactory(sortColumns));
+    if (!sorting.getOffsetLimit().isNone()) {
+      operators.add(new ScanOperatorFactory(
+          null,
+          null,
+          sorting.getOffsetLimit().toOperatorOffsetLimit(),
+          null,
+          null,
+          null));
+    }
+
+    return new WindowOperatorQuery(
+        newDataSource,
+        new LegacySegmentSpec(Intervals.ETERNITY),
+        plannerContext.queryContextMap(),
+        signature,
+        operators,
+        null);
+  }
+
+  private ArrayList<ColumnWithDirection> 
getColumnWithDriectionsFromOrderBys(List<OrderByColumnSpec> orderBys)
+  {
+    ArrayList<ColumnWithDirection> ordering = new ArrayList<>();
+    for (OrderByColumnSpec orderBySpec : orderBys) {
+      Direction direction = orderBySpec.getDirection() == 
OrderByColumnSpec.Direction.ASCENDING
+          ? ColumnWithDirection.Direction.ASC
+          : ColumnWithDirection.Direction.DESC;
+      ordering.add(new ColumnWithDirection(orderBySpec.getDimension(), 
direction));
+    }
+    return ordering;
+  }
+
   /**
    * Return this query as a Scan query, or null if this query is not 
compatible with Scan.

Review Comment:
   could you add a few more details here as to how considerSorting parameter is 
used? 



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -1473,13 +1481,78 @@ private WindowOperatorQuery toWindowQuery()
     );
   }
 
+  /**
+   * Create an OperatorQuery which runs an order on top of a scan.
+   */
+  @Nullable
+  private WindowOperatorQuery toScanAndSortQuery()
+  {
+    if (sorting == null
+        || sorting.getOrderBys().isEmpty()
+        || sorting.getProjection() != null) {
+      return null;
+    }
+
+    ScanQuery scan = toScanQuery(false);
+    if (scan == null) {
+      return null;
+    }
+
+    // Reject cases which would sort the datasource directly
+    if (dataSource != DruidOuterQueryRel.DUMMY_DATA_SOURCE && 
dataSource.isConcrete()) {
+      List<String> orderByColumnNames = sorting.getOrderBys()
+          .stream().map(OrderByColumnSpec::getDimension)
+          .collect(Collectors.toList());
+      plannerContext.setPlanningError(
+          "SQL query requires order by non-time column [%s], which is not 
supported.",
+          orderByColumnNames);
+      return null;
+    }
+
+    QueryDataSource newDataSource = new QueryDataSource(scan);
+    ArrayList<ColumnWithDirection> sortColumns = 
getColumnWithDriectionsFromOrderBys(sorting.getOrderBys());

Review Comment:
   ```suggestion
       ArrayList<ColumnWithDirection> sortColumns = 
getColumnWithDirectionsFromOrderBys(sorting.getOrderBys());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to