This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bde7e77  [NO ISSUE][COMP] Copy LIMIT through UNION ALL
bde7e77 is described below

commit bde7e7781d82c080ebd6b7b0b59a410fd54f2aa2
Author: Dmitry Lychagin <[email protected]>
AuthorDate: Thu Oct 1 18:24:26 2020 -0700

    [NO ISSUE][COMP] Copy LIMIT through UNION ALL
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Enhance CopyLimitDownRule to support copying
      LIMIT operators through UNION ALL
    
    Change-Id: I9df5e853538c14a05108ac6f0f4ef4916a5f8e1a
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8064
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../union/union_opt_1/union_opt_1.10.query.sqlpp   |  45 +++++++
 .../union/union_opt_1/union_opt_1.11.query.sqlpp   |  46 +++++++
 .../union/union_opt_1/union_opt_1.8.query.sqlpp    |  36 ++++++
 .../union/union_opt_1/union_opt_1.9.query.sqlpp    |  37 ++++++
 .../results/union/union_opt_1/union_opt_1.10.adm   |   4 +
 .../results/union/union_opt_1/union_opt_1.11.adm   | 106 ++++++++++++++++
 .../results/union/union_opt_1/union_opt_1.8.adm    |   4 +
 .../results/union/union_opt_1/union_opt_1.9.adm    |  54 +++++++++
 .../rewriter/rules/CopyLimitDownRule.java          | 134 +++++++++++++--------
 9 files changed, 419 insertions(+), 47 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp
new file mode 100644
index 0000000..4fe8ac9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.10.query.sqlpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed to data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+with T1 as (
+  select two from onek1 where two between 1 and 10
+  union all
+  select two from onek2 where two between 1 and 100
+),
+T2 as (
+  select two from onek1 where two between 1 and 1000
+  union all
+  select two from onek2 where two between 1 and 10000
+),
+T3 as (
+  select two from T1
+  union all
+  select two from T2
+)
+select value t from T3 t
+limit 4;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp
new file mode 100644
index 0000000..0109646
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.11.query.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed to data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+explain
+with T1 as (
+  select two from onek1 where two between 1 and 10
+  union all
+  select two from onek2 where two between 1 and 100
+),
+T2 as (
+  select two from onek1 where two between 1 and 1000
+  union all
+  select two from onek2 where two between 1 and 10000
+),
+T3 as (
+  select two from T1
+  union all
+  select two from T2
+)
+select value t from T3 t
+limit 4;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp
new file mode 100644
index 0000000..373861b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.8.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed into data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+with T as (
+  select two from onek1
+  union all
+  select two from onek2
+)
+select value t from T t
+where two > 0
+limit 4;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp
new file mode 100644
index 0000000..41ef7d8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_opt_1/union_opt_1.9.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that LIMIT is copied down through UNION ALL
+ * and pushed into data scan
+ */
+
+set `compiler.parallelism` "1";
+
+use test;
+
+explain
+with T as (
+  select two from onek1
+  union all
+  select two from onek2
+)
+select value t from T t
+where two > 0
+limit 4;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm
new file mode 100644
index 0000000..a46ab74
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.10.adm
@@ -0,0 +1,4 @@
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm
new file mode 100644
index 0000000..1b39645
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.11.adm
@@ -0,0 +1,106 @@
+distribute result [$$t]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 4
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        union ($$151, $$310, $$t)
+        -- UNION_ALL  |PARTITIONED|
+          exchange
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            union ($$213, $$227, $$151)
+            -- UNION_ALL  |PARTITIONED|
+              exchange
+              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                project ([$$213])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$213] <- [{"two": $$183}]
+                  -- ASSIGN  |PARTITIONED|
+                    limit 4
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$183])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$183] <- [$$onek1.getField(2)]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$onek1])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$187, $$onek1] <- test.onek1 
condition (and(ge($$onek1.getField(2), 1), le($$onek1.getField(2), 10))) limit 4
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange
+              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                project ([$$227])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$227] <- [{"two": $$184}]
+                  -- ASSIGN  |PARTITIONED|
+                    limit 4
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$184])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$184] <- [$$onek2.getField(2)]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$onek2])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$188, $$onek2] <- test.onek2 
condition (and(ge($$onek2.getField(2), 1), le($$onek2.getField(2), 100))) limit 
4
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+          exchange
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            union ($$345, $$356, $$310)
+            -- UNION_ALL  |PARTITIONED|
+              exchange
+              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                project ([$$345])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$345] <- [{"two": $$185}]
+                  -- ASSIGN  |PARTITIONED|
+                    limit 4
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$185])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$185] <- [$$onek1.getField(2)]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$onek1])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$189, $$onek1] <- test.onek1 
condition (and(ge($$onek1.getField(2), 1), le($$onek1.getField(2), 1000))) 
limit 4
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange
+              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                project ([$$356])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$356] <- [{"two": $$186}]
+                  -- ASSIGN  |PARTITIONED|
+                    limit 4
+                    -- STREAM_LIMIT  |PARTITIONED|
+                      project ([$$186])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$186] <- [$$onek2.getField(2)]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$onek2])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$190, $$onek2] <- test.onek2 
condition (and(ge($$onek2.getField(2), 1), le($$onek2.getField(2), 10000))) 
limit 4
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm
new file mode 100644
index 0000000..a46ab74
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.8.adm
@@ -0,0 +1,4 @@
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
+{ "two": 1 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm
new file mode 100644
index 0000000..4a46e2d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_opt_1/union_opt_1.9.adm
@@ -0,0 +1,54 @@
+distribute result [$$t]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 4
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        union ($$48, $$54, $$t)
+        -- UNION_ALL  |PARTITIONED|
+          exchange
+          -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+            limit 4
+            -- STREAM_LIMIT  |PARTITIONED|
+              project ([$$48])
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$48] <- [{"two": $$103}]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$103])
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    assign [$$103] <- [$$onek1.getField(2)]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$onek1])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$61, $$onek1] <- test.onek1 
condition (gt($$onek1.getField(2), 0)) limit 4
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+          exchange
+          -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+            limit 4
+            -- STREAM_LIMIT  |PARTITIONED|
+              project ([$$54])
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$54] <- [{"two": $$105}]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$105])
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    assign [$$105] <- [$$onek2.getField(2)]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$onek2])
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$62, $$onek2] <- test.onek2 
condition (gt($$onek2.getField(2), 0)) limit 4
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
index f0eca82..382b80d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.rewriter.rules;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -62,62 +63,101 @@ public class CopyLimitDownRule implements 
IAlgebraicRewriteRule {
         List<LogicalVariable> limitUsedVars = new ArrayList<>();
         VariableUtilities.getUsedVariables(limitOp, limitUsedVars);
 
-        Mutable<ILogicalOperator> safeOpRef = null;
-        Mutable<ILogicalOperator> candidateOpRef = limitOp.getInputs().get(0);
+        List<ILogicalOperator> safeOps = new ArrayList<>();
+        List<LogicalVariable> tmpCandidateProducedVars = new ArrayList<>();
+        ILogicalOperator limitInputOp = limitOp.getInputs().get(0).getValue();
 
-        List<LogicalVariable> candidateProducedVars = new ArrayList<>();
-        while (true) {
-            ILogicalOperator candidateOp = candidateOpRef.getValue();
-            LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
-            if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
-                    || candidateOpTag == LogicalOperatorTag.SELECT || 
candidateOpTag == LogicalOperatorTag.UNNEST_MAP) {
-                break;
+        findSafeOpsInSubtree(limitInputOp, limitUsedVars, safeOps, 
tmpCandidateProducedVars);
+        if (safeOps.isEmpty()) {
+            return false;
+        }
+
+        SourceLocation sourceLoc = limitOp.getSourceLocation();
+
+        for (ILogicalOperator safeOp : safeOps) {
+            for (Mutable<ILogicalOperator> unsafeOpRef : safeOp.getInputs()) {
+                ILogicalOperator unsafeOp = unsafeOpRef.getValue();
+                ILogicalExpression maxObjectsExpr = 
limitOp.getMaxObjects().getValue();
+                ILogicalExpression newMaxObjectsExpr;
+                if (limitOp.getOffset().getValue() == null) {
+                    newMaxObjectsExpr = maxObjectsExpr.cloneExpression();
+                } else {
+                    // Need to add an offset to the given limit value
+                    // since the original topmost limit will use the offset 
value.
+                    // We can't apply the offset multiple times.
+                    IFunctionInfo finfoAdd =
+                            
context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NUMERIC_ADD);
+                    List<Mutable<ILogicalExpression>> addArgs = new 
ArrayList<>(2);
+                    addArgs.add(new 
MutableObject<>(maxObjectsExpr.cloneExpression()));
+                    addArgs.add(new 
MutableObject<>(limitOp.getOffset().getValue().cloneExpression()));
+                    ScalarFunctionCallExpression maxPlusOffset = new 
ScalarFunctionCallExpression(finfoAdd, addArgs);
+                    maxPlusOffset.setSourceLocation(sourceLoc);
+                    newMaxObjectsExpr = maxPlusOffset;
+                }
+                LimitOperator limitCloneOp = new 
LimitOperator(newMaxObjectsExpr, false);
+                limitCloneOp.setSourceLocation(sourceLoc);
+                limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
+                limitCloneOp.getInputs().add(new MutableObject<>(unsafeOp));
+                limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
+                context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
+                limitCloneOp.recomputeSchema();
+                unsafeOpRef.setValue(limitCloneOp);
             }
+        }
+
+        context.addToDontApplySet(this, limitOp);
+
+        return true;
+    }
 
-            candidateProducedVars.clear();
-            VariableUtilities.getProducedVariables(candidateOp, 
candidateProducedVars);
-            if (!OperatorPropertiesUtil.disjoint(limitUsedVars, 
candidateProducedVars)) {
+    private boolean findSafeOpsInSubtree(ILogicalOperator candidateOp, 
List<LogicalVariable> limitUsedVars,
+            Collection<? super ILogicalOperator> outSafeOps, 
List<LogicalVariable> tmpCandidateProducedVars)
+            throws AlgebricksException {
+        ILogicalOperator safeOp = null;
+
+        while (isSafeOpCandidate(candidateOp)) {
+            tmpCandidateProducedVars.clear();
+            VariableUtilities.getProducedVariables(candidateOp, 
tmpCandidateProducedVars);
+            if (!OperatorPropertiesUtil.disjoint(limitUsedVars, 
tmpCandidateProducedVars)) {
                 break;
             }
 
-            safeOpRef = candidateOpRef;
-            candidateOpRef = safeOpRef.getValue().getInputs().get(0);
+            List<Mutable<ILogicalOperator>> candidateOpInputs = 
candidateOp.getInputs();
+            if (candidateOpInputs.size() > 1) {
+                boolean foundSafeOpInBranch = false;
+                for (Mutable<ILogicalOperator> inputOpRef : candidateOpInputs) 
{
+                    foundSafeOpInBranch |= 
findSafeOpsInSubtree(inputOpRef.getValue(), limitUsedVars, outSafeOps,
+                            tmpCandidateProducedVars);
+                }
+                if (!foundSafeOpInBranch) {
+                    outSafeOps.add(candidateOp);
+                }
+                return true;
+            }
+
+            safeOp = candidateOp;
+            candidateOp = candidateOpInputs.get(0).getValue();
         }
 
-        if (safeOpRef != null) {
-            ILogicalOperator safeOp = safeOpRef.getValue();
-            Mutable<ILogicalOperator> unsafeOpRef = safeOp.getInputs().get(0);
-            ILogicalOperator unsafeOp = unsafeOpRef.getValue();
-            SourceLocation sourceLoc = limitOp.getSourceLocation();
-            LimitOperator limitCloneOp = null;
-            if (limitOp.getOffset().getValue() == null) {
-                limitCloneOp = new 
LimitOperator(limitOp.getMaxObjects().getValue(), false);
-                limitCloneOp.setSourceLocation(sourceLoc);
-            } else {
-                // Need to add an offset to the given limit value
-                // since the original topmost limit will use the offset value.
-                // We can't apply the offset multiple times.
-                IFunctionInfo finfoAdd =
-                        
context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NUMERIC_ADD);
-                List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
-                addArgs.add(
-                        new 
MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
-                addArgs.add(new 
MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
-                ScalarFunctionCallExpression maxPlusOffset = new 
ScalarFunctionCallExpression(finfoAdd, addArgs);
-                maxPlusOffset.setSourceLocation(sourceLoc);
-                limitCloneOp = new LimitOperator(maxPlusOffset, false);
-                limitCloneOp.setSourceLocation(sourceLoc);
-            }
-            limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
-            limitCloneOp.getInputs().add(new 
MutableObject<ILogicalOperator>(unsafeOp));
-            limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
-            
OperatorPropertiesUtil.computeSchemaRecIfNull((AbstractLogicalOperator) 
unsafeOp);
-            limitCloneOp.recomputeSchema();
-            unsafeOpRef.setValue(limitCloneOp);
-            context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
-            context.addToDontApplySet(this, limitOp);
+        if (safeOp != null) {
+            outSafeOps.add(safeOp);
+            return true;
+        } else {
+            return false;
         }
+    }
 
-        return safeOpRef != null;
+    private static boolean isSafeOpCandidate(ILogicalOperator op) {
+        switch (op.getOperatorTag()) {
+            case UNIONALL:
+                return true;
+            // exclude following 'map' operators because they change 
cardinality
+            case SELECT:
+            case UNNEST:
+            case UNNEST_MAP:
+                return false;
+            default:
+                return op.getInputs().size() == 1 && op.isMap();
+        }
     }
 }

Reply via email to