This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit e19235751d9d21288d385b2a3b52f22f6239239e Author: Ali Alsuliman <[email protected]> AuthorDate: Tue Jan 6 18:48:44 2026 -0800 [ASTERIXDB-3682][COMP] Push extracted assign to join branch - user model changes: no - storage format changes: no - interface changes: no Details: When extracting a field access from an operator into an assign operator, ensure the assign operator is placed into the right branch in case of a join operator. Ext-ref: MB-69958 Change-Id: I74452d71423610ca2a34631235f5da8325e3b625 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20767 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Preetham Poluparthi <[email protected]> --- .../rules/ByNameToByIndexFieldAccessRule.java | 65 +++- .../optimizer/rules/LoadRecordFieldsRule.java | 54 ++-- .../ASTERIXDB-3682-field-access-in-join.sqlpp | 107 +++++++ .../ASTERIXDB-3682-field-access-in-join.plan | 348 +++++++++++++++++++++ 4 files changed, 535 insertions(+), 39 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ByNameToByIndexFieldAccessRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ByNameToByIndexFieldAccessRule.java index abc434f3f1..e4bcbd8937 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ByNameToByIndexFieldAccessRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ByNameToByIndexFieldAccessRule.java @@ -21,6 +21,8 @@ package org.apache.asterix.optimizer.rules; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.apache.asterix.algebra.base.OperatorAnnotation; import org.apache.asterix.om.base.AInt32; @@ -45,6 +47,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvir import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.api.exceptions.SourceLocation; @@ -85,20 +88,54 @@ public class ByNameToByIndexFieldAccessRule implements IAlgebraicRewriteRule { if (fce.getFunctionIdentifier() != BuiltinFunctions.FIELD_ACCESS_BY_NAME) { return changed; } - changed |= extractFirstArg(fce, op, context); - IVariableTypeEnvironment env = context.getOutputTypeEnvironment(op.getInputs().get(0).getValue()); - IAType t = (IAType) env.getType(fce.getArguments().get(0).getValue()); - changed |= rewriteFieldAccess(exprRef, fce, TypeComputeUtils.getActualType(t)); + int k = extractFirstArg(fce, op, context); + changed |= k >= 0; + if (k < 0) { + if (op.getInputs().size() > 1) { + context.computeAndSetTypeEnvironmentForOperator(op); + changed |= rewriteFieldAccessUsing(op, exprRef, context, fce); + return changed; + } + k = 0; + } + changed |= rewriteFieldAccessUsing(op.getInputs().get(k).getValue(), exprRef, context, fce); return changed; } - // Extracts the first argument of a field-access expression into an separate assign operator. - private boolean extractFirstArg(AbstractFunctionCallExpression fce, ILogicalOperator op, + // Extracts the first argument of a field-access expression into a separate assign operator. + private static int extractFirstArg(AbstractFunctionCallExpression fce, ILogicalOperator op, IOptimizationContext context) throws AlgebricksException { ILogicalExpression firstArg = fce.getArguments().get(0).getValue(); if (firstArg.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { - return false; + return -1; + } + if (op.getInputs().size() > 1) { + return extractToBranch(fce, op, context, firstArg); } + extractToAssignOp(fce, op.getInputs().get(0), context, firstArg); + return 0; + } + + private static int extractToBranch(AbstractFunctionCallExpression fce, ILogicalOperator op, + IOptimizationContext ctx, ILogicalExpression firstArg) throws AlgebricksException { + Set<LogicalVariable> usedByExpr = new HashSet<>(); + Set<LogicalVariable> inputLiveVars = new HashSet<>(); + fce.getUsedVariables(usedByExpr); + int i = 0; + for (Mutable<ILogicalOperator> input : op.getInputs()) { + inputLiveVars.clear(); + VariableUtilities.getLiveVariables(input.getValue(), inputLiveVars); + if (inputLiveVars.containsAll(usedByExpr)) { + extractToAssignOp(fce, input, ctx, firstArg); + return i; + } + i++; + } + return -1; + } + + private static void extractToAssignOp(AbstractFunctionCallExpression fce, Mutable<ILogicalOperator> op, + IOptimizationContext context, ILogicalExpression firstArg) throws AlgebricksException { SourceLocation sourceLoc = firstArg.getSourceLocation(); LogicalVariable var1 = context.newVar(); AssignOperator assignOp = new AssignOperator(new ArrayList<>(Collections.singletonList(var1)), @@ -107,14 +144,20 @@ public class ByNameToByIndexFieldAccessRule implements IAlgebraicRewriteRule { VariableReferenceExpression var1Ref = new VariableReferenceExpression(var1); var1Ref.setSourceLocation(sourceLoc); fce.getArguments().get(0).setValue(var1Ref); - assignOp.getInputs().add(new MutableObject<>(op.getInputs().get(0).getValue())); - op.getInputs().get(0).setValue(assignOp); + assignOp.getInputs().add(new MutableObject<>(op.getValue())); + op.setValue(assignOp); context.computeAndSetTypeEnvironmentForOperator(assignOp); - return true; + } + + private static boolean rewriteFieldAccessUsing(ILogicalOperator op, Mutable<ILogicalExpression> exprRef, + IOptimizationContext ctx, AbstractFunctionCallExpression fce) throws AlgebricksException { + IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op); + IAType t = (IAType) env.getType(fce.getArguments().get(0).getValue()); + return rewriteFieldAccess(exprRef, fce, TypeComputeUtils.getActualType(t)); } // Rewrites field-access-by-name into field-access-by-index if possible. - private boolean rewriteFieldAccess(Mutable<ILogicalExpression> exprRef, AbstractFunctionCallExpression fce, + private static boolean rewriteFieldAccess(Mutable<ILogicalExpression> exprRef, AbstractFunctionCallExpression fce, IAType t) { if (t.getTypeTag() != ATypeTag.OBJECT) { return false; diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/LoadRecordFieldsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/LoadRecordFieldsRule.java index 42cce5233e..b361ee140c 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/LoadRecordFieldsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/LoadRecordFieldsRule.java @@ -21,12 +21,10 @@ package org.apache.asterix.optimizer.rules; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; +import java.util.Set; import org.apache.asterix.algebra.base.OperatorAnnotation; -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.om.base.AInt32; import org.apache.asterix.om.base.AString; @@ -96,8 +94,7 @@ public class LoadRecordFieldsRule implements IAlgebraicRewriteRule { context.addToDontApplySet(this, op1); } if (res && op1.getOperatorTag() == LogicalOperatorTag.SELECT) { - // checking if we can annotate a Selection as using just one field - // access + // checking if we can annotate a Selection as using just one field access SelectOperator sigma = (SelectOperator) op1; List<LogicalVariable> vars = new ArrayList<>(); VariableUtilities.getUsedVariables(sigma, vars); @@ -107,8 +104,7 @@ public class LoadRecordFieldsRule implements IAlgebraicRewriteRule { ILogicalExpression expr1 = getFirstExpr(assign1); if (FunctionUtil.isFieldAccessFunction(expr1)) { AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expr1; - // f should be a call to a field/data access kind of - // function + // f should be a call to a field/data access kind of function sigma.getAnnotations().put(OperatorAnnotation.FIELD_ACCESS, f.getArguments().get(0)); } } @@ -144,7 +140,9 @@ public class LoadRecordFieldsRule implements IAlgebraicRewriteRule { LogicalVariable v = context.newVar(); AssignOperator a2 = new AssignOperator(v, new MutableObject<>(f)); a2.setSourceLocation(expr.getSourceLocation()); - pushFieldAssign(a2, topOp, context); + if (!pushFieldAssign(a2, topOp, context)) { + return false; + } context.computeAndSetTypeEnvironmentForOperator(a2); ILogicalExpression arg = f.getArguments().get(0).getValue(); if (arg.getExpressionTag() == LogicalExpressionTag.VARIABLE) { @@ -183,59 +181,59 @@ public class LoadRecordFieldsRule implements IAlgebraicRewriteRule { } } - private static void pushFieldAssign(AssignOperator a2, AbstractLogicalOperator topOp, IOptimizationContext context) - throws AlgebricksException { + private static boolean pushFieldAssign(AssignOperator a2, AbstractLogicalOperator topOp, + IOptimizationContext context) throws AlgebricksException { if (topOp.getInputs().size() == 1 && !topOp.hasNestedPlans()) { Mutable<ILogicalOperator> topChild = topOp.getInputs().get(0); - // plugAccessAboveOp(a2, topChild, context); List<Mutable<ILogicalOperator>> a2InptList = a2.getInputs(); - a2InptList.clear(); a2InptList.add(topChild); // and link it as child in the op. tree topOp.getInputs().set(0, new MutableObject<>(a2)); findAndEliminateRedundantFieldAccess(a2, context); + return true; } else { // e.g., a join - LinkedList<LogicalVariable> usedInAccess = new LinkedList<LogicalVariable>(); + Set<LogicalVariable> usedInAccess = new HashSet<>(); VariableUtilities.getUsedVariables(a2, usedInAccess); - LinkedList<LogicalVariable> produced2 = new LinkedList<LogicalVariable>(); + Set<LogicalVariable> produced2 = new HashSet<>(); VariableUtilities.getProducedVariables(topOp, produced2); if (OperatorPropertiesUtil.disjoint(produced2, usedInAccess)) { + Set<LogicalVariable> inputLiveVars = new HashSet<>(); for (Mutable<ILogicalOperator> inp : topOp.getInputs()) { - HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>(); - VariableUtilities.getLiveVariables(inp.getValue(), v2); - if (!OperatorPropertiesUtil.disjoint(usedInAccess, v2)) { - pushAccessAboveOpRef(a2, inp, context); - return; + inputLiveVars.clear(); + VariableUtilities.getLiveVariables(inp.getValue(), inputLiveVars); + // push the assign to the correct branch that contains all the used vars + if (inputLiveVars.containsAll(usedInAccess)) { + pushAccessThroughOp(a2, inp, context); + return true; } } if (topOp.hasNestedPlans()) { AbstractOperatorWithNestedPlans nestedOp = (AbstractOperatorWithNestedPlans) topOp; for (ILogicalPlan plan : nestedOp.getNestedPlans()) { for (Mutable<ILogicalOperator> root : plan.getRoots()) { - HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>(); - VariableUtilities.getLiveVariables(root.getValue(), v2); - if (!OperatorPropertiesUtil.disjoint(usedInAccess, v2)) { - pushAccessAboveOpRef(a2, root, context); - return; + inputLiveVars.clear(); + VariableUtilities.getLiveVariables(root.getValue(), inputLiveVars); + if (inputLiveVars.containsAll(usedInAccess)) { + pushAccessThroughOp(a2, root, context); + return true; } } } } - throw new CompilationException(ErrorCode.COMPILATION_ERROR, a2.getSourceLocation(), - "Field access " + getFirstExpr(a2) + " does not correspond to any input"); } } + return false; } /** - * Pushes one field-access assignment above toPushThroughChildRef + * Pushes one field-access assignment through toPushThroughChildRef * * @param toPush * @param toPushThroughChildRef */ - private static void pushAccessAboveOpRef(AssignOperator toPush, Mutable<ILogicalOperator> toPushThroughChildRef, + private static void pushAccessThroughOp(AssignOperator toPush, Mutable<ILogicalOperator> toPushThroughChildRef, IOptimizationContext context) throws AlgebricksException { List<Mutable<ILogicalOperator>> tpInpList = toPush.getInputs(); tpInpList.clear(); diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ASTERIXDB-3682-field-access-in-join.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ASTERIXDB-3682-field-access-in-join.sqlpp new file mode 100644 index 0000000000..221d35e3e0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/ASTERIXDB-3682-field-access-in-join.sqlpp @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATAVERSE test IF EXISTS; +CREATE DATAVERSE test; +USE test; + +CREATE TYPE OpenType AS {id: int}; +CREATE DATASET col1(OpenType) PRIMARY KEY id; +CREATE DATASET col2(OpenType) PRIMARY KEY id; + +SELECT final_q.* +FROM ( +SELECT subquery1.*, + subquery3.cKey AS lov_cKey, + subquery2.wsd, + subquery2.wsm, + subquery2.wsc, + subquery2.scKey, + subquery2.wpdi, + sbr.tohac.qaz AS bch, + sbr.tohac.omaa AS omaa, + UPPER(SUBSTR(sbr.tohac.qaz, 0, POS(sbr.tohac.qaz, '_'))) AS bco + FROM (SELECT join_key2, + s1.wsx1 AS lov13_wsx1, + s1.ikjhu AS lcd, + sataqw.name AS lov13_name, + s1.uunn AS lov13_uunn, + TO_STR(u_val.t0) AS lov13_t0_dd, + SUBSTR(TO_STR(u_val.t0),0,19) AS lov13_t0_ff +FROM ( + SELECT SUBSTR(b.qwer,0,7) AS wsx1, + SUBSTR(b.qwer,0,10) AS ikjhu, + b.uunn, + ( + SELECT VALUE { "name": item.name, "val": ( + SELECT VALUE { "marop": item2.marop, "t0": item2.t0 } + FROM item.`value` AS item2 + WHERE item2.ua = 'iap' + AND item2.marop IS VALUED) } + FROM OBJECT_PAIRS(b.ihsaa) AS item + WHERE ANY v IN ["X", "Y" ] SATISFIES v = SPLIT(item.name," ")[0] END) AS ihsaa + FROM `col2` b + WHERE (b.`t` = "lovStat") + AND NOT (ARRAY_LENGTH(OBJECT_NAMES(b.ihsaa)) = 1 + AND OBJECT_NAMES(b.ihsaa)[0] = "some_str") + AND (ANY item IN OBJECT_PAIRS(b.ihsaa) SATISFIES (SPLIT(item.name, " ")[0] IN [ "X", "Y" ]) + AND (ANY item2 IN item.`value` SATISFIES (item2.ua = "iap" + AND item2.marop IS VALUED) END) END)) s1 +UNNEST s1.ihsaa sataqw +UNNEST sataqw.val u_val +LET join_key2 = SUBSTR(IFMISSINGORNULL(IFMISSINGORNULL(u_val.marop.someId1, u_val.marop.someId2), u_val.marop.someId3),0,38 ) +WHERE join_key2 IS NOT NULL + AND NOT sataqw.name IN [ 'p1', 'p2', 'p3']) subquery1 + JOIN ( + SELECT p1.cKey, + uniqueID + FROM ( + SELECT p.cKey, + p.members[*].uniqueID +FROM col1 p +WHERE p.`t` = 'ccc' + AND p.`r` = 'uuu' + AND cKey IS NOT MISSING) p1 + UNNEST p1.uniqueID uniqueID) subquery3 ON subquery3.uniqueID = subquery1.lov13_uunn + JOIN ( + SELECT SUBSTR(p2.ed,0,38) AS join_key1, + p2.kat.coaed AS wpdi, + p2.kat.stchk AS wsc, + UPPER(SUBSTR(p2.kat.stchk, 0, POS(p2.kat.stchk, '_'))) AS scKey, + (SELECT VALUE item.wkps FROM p2.kat.intib AS item) AS wwlps, + SUBSTR(p2.kat.ssatt,0,10) AS wsd, + SUBSTR(p2.kat.ssatt,0,7) AS wsm + FROM col1 p2 + WHERE p2.`t` = 'ak1' + AND p2.ha IN ['l1', 'l2'] + AND p2.kat IS NOT MISSING + AND p2.kat.coaed IS NOT MISSING ) subquery2 ON subquery1.join_key2 = subquery2.join_key1 + AND subquery3.cKey = subquery2.scKey + JOIN col1 sbr ON sbr.ed = subquery2.wpdi + AND sbr.tohac IS NOT MISSING ) final_q +WHERE final_q.bco IS NOT MISSING + AND final_q.join_key2 LIKE '%' + AND final_q.wsc LIKE '%' + AND final_q.scKey LIKE '%' + AND final_q.bch LIKE '%' + AND final_q.bco LIKE '%' +ORDER BY final_q.join_key2, + final_q.wsc, + final_q.lcd, + final_q.wsd; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-3682-field-access-in-join.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-3682-field-access-in-join.plan new file mode 100644 index 0000000000..a44e1cb635 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/ASTERIXDB-3682-field-access-in-join.plan @@ -0,0 +1,348 @@ +distribute result [$$446] +-- DISTRIBUTE_RESULT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + project ([$$446]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$446] <- [if-missing-or-null(to-object($$593), cast($$456))] + -- ASSIGN |PARTITIONED| + project ([$$456, $$593]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- SORT_MERGE_EXCHANGE [$$529(ASC), $$530(ASC), $$531(ASC), $$532(ASC) ] |PARTITIONED| + order (ASC, $$529) (ASC, $$530) (ASC, $$531) (ASC, $$532) + -- STABLE_SORT [$$529(ASC), $$530(ASC), $$531(ASC), $$532(ASC)] |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + assign [$$529, $$530, $$531, $$532] <- [$$593.getField("join_key2"), $$593.getField("wsc"), $$593.getField("lcd"), $$593.getField("wsd")] + -- ASSIGN |PARTITIONED| + project ([$$456, $$593]) + -- STREAM_PROJECT |PARTITIONED| + select (and(like($$455, "%"), not(is-missing($$455)))) + -- STREAM_SELECT |PARTITIONED| + project ([$$456, $$593, $$455]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$593, $$455] <- [object-concat-strict(if-missing-or-null(to-object($$323), cast($$456)), {"lov_cKey": $$466, "wsd": $$603, "wsm": $$604, "wsc": $$513, "scKey": $$602, "wpdi": $$509, "bch": $$525, "omaa": $$526, "bco": uppercase(substring($$525, 0, position($$525, "_")))}), object-concat-strict(if-missing-or-null(to-object($$323), cast($$456)), {"lov_cKey": $$466, "wsd": $$388.getField(3), "wsm": $$388.getField(4), "wsc": $$513, "scKey": $$602, "wpdi": $$5 [...] + -- ASSIGN |PARTITIONED| + select (and(like(object-concat-strict(if-missing-or-null(to-object($$323), cast($$456)), {"lov_cKey": $$466, "wsd": $$388.getField(3), "wsm": $$388.getField(4), "wsc": $$388.getField("wsc"), "scKey": $$602, "wpdi": $$509, "bch": $$525, "omaa": $$526, "bco": uppercase(substring($$525, 0, position($$525, "_")))}).getField("bch"), "%"), like(object-concat-strict(if-missing-or-null(to-object($$323), cast($$456)), {"lov_cKey": $$466, "wsd": $$388.getField(3), "wsm": [...] + -- STREAM_SELECT |PARTITIONED| + project ([$$323, $$466, $$603, $$604, $$513, $$602, $$509, $$388, $$456, $$525, $$526]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + join (eq($$476, $$509)) + -- HYBRID_HASH_JOIN [$$509][$$476] |PARTITIONED| + exchange + -- HASH_PARTITION_EXCHANGE [$$509] |PARTITIONED| + assign [$$456] <- [{ }] + -- ASSIGN |PARTITIONED| + project ([$$323, $$466, $$603, $$604, $$513, $$602, $$509, $$388]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + join (and(eq($$599, $$601), eq($$466, $$602))) + -- HYBRID_HASH_JOIN [$$599, $$466][$$601, $$602] |PARTITIONED| + exchange + -- HASH_PARTITION_EXCHANGE [$$599, $$466] |PARTITIONED| + project ([$$323, $$466, $$599]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + join (eq($$uniqueID, $$489)) + -- HYBRID_HASH_JOIN [$$489][$$uniqueID] |PARTITIONED| + exchange + -- HASH_PARTITION_EXCHANGE [$$489] |PARTITIONED| + project ([$$323, $$599, $$489]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$323] <- [{"join_key2": $$599, "lov13_wsx1": substring($$465, 0, 7), "lcd": substring($$465, 0, 10), "lov13_name": $$467, "lov13_uunn": $$489, "lov13_t0_dd": $$600, "lov13_t0_ff": substring($$600, 0, 19)}] + -- ASSIGN |PARTITIONED| + project ([$$465, $$489, $$467, $$599, $$600]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$600] <- [to-string($$u_val.getField("t0"))] + -- ASSIGN |PARTITIONED| + select (not(is-null($$599))) + -- STREAM_SELECT |PARTITIONED| + project ([$$465, $$489, $$467, $$u_val, $$599]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$599] <- [substring(if-missing-or-null(if-missing-or-null($$461.getField("someId1"), $$461.getField("someId2")), $$461.getField("someId3")), 0, 38)] + -- ASSIGN |PARTITIONED| + assign [$$461] <- [$$u_val.getField("marop")] + -- ASSIGN |PARTITIONED| + project ([$$465, $$489, $$467, $$u_val]) + -- STREAM_PROJECT |PARTITIONED| + unnest $$u_val <- scan-collection($$290) + -- UNNEST |PARTITIONED| + project ([$$465, $$489, $$467, $$290]) + -- STREAM_PROJECT |PARTITIONED| + subplan { + aggregate [$$290] <- [listify($$289)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- AGGREGATE |LOCAL| + assign [$$289] <- [{"marop": $$468, "t0": $$488}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- ASSIGN |LOCAL| + select (and(eq($$487, "iap"), not(is-unknown($$468)))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- STREAM_SELECT |LOCAL| + assign [$$488, $$468, $$487] <- [$$item2.getField("t0"), $$item2.getField("marop"), $$item2.getField("ua")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- ASSIGN |LOCAL| + unnest $$item2 <- scan-collection($$486) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- UNNEST |LOCAL| + nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- SUBPLAN |PARTITIONED| + select (not(or(eq($$467, "p1"), eq($$467, "p2"), eq($$467, "p3")))) + -- STREAM_SELECT |PARTITIONED| + project ([$$465, $$489, $$486, $$467]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + join (eq($$470, $$275)) + -- HYBRID_HASH_JOIN [$$275][$$470] |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + assign [$$275] <- [get-item(split($$467, " "), 0)] + -- ASSIGN |PARTITIONED| + project ([$$465, $$489, $$486, $$467]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$486, $$467] <- [$$item.getField("value"), $$item.getField("name")] + -- ASSIGN |PARTITIONED| + project ([$$465, $$489, $$item]) + -- STREAM_PROJECT |PARTITIONED| + unnest $$item <- scan-collection(object-pairs($$477)) + -- UNNEST |PARTITIONED| + project ([$$465, $$489, $$477]) + -- STREAM_PROJECT |PARTITIONED| + select ($$262) + -- STREAM_SELECT |PARTITIONED| + project ([$$262, $$465, $$489, $$477]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + group by ([$$551 := $$457]) decor ([$$465; $$489; $$477]) { + aggregate [$$262] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- AGGREGATE |LOCAL| + select (not(is-missing($$550))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- STREAM_SELECT |LOCAL| + nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- PRE_CLUSTERED_GROUP_BY[$$457] |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + project ([$$489, $$465, $$477, $$550, $$457]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + left outer join (eq($$457, $$549)) + -- HYBRID_HASH_JOIN [$$457][$$549] |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + project ([$$489, $$465, $$477, $$457]) + -- STREAM_PROJECT |PARTITIONED| + select (and(eq($$b.getField("t"), "lovStat"), not(and(eq(len($$597), 1), eq(get-item($$597, 0), "some_str"))))) + -- STREAM_SELECT |PARTITIONED| + assign [$$597] <- [object-names($$477)] + -- ASSIGN |PARTITIONED| + assign [$$489, $$465, $$477] <- [$$b.getField("uunn"), $$b.getField("qwer"), $$b.getField("ihsaa")] + -- ASSIGN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + data-scan []<-[$$457, $$b] <- test.col2 + -- DATASOURCE_SCAN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + empty-tuple-source + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + assign [$$550] <- [true] + -- ASSIGN |PARTITIONED| + project ([$$549]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + join (eq($$469, $$251)) + -- HYBRID_HASH_JOIN [$$251][$$469] |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + project ([$$549, $$251]) + -- STREAM_PROJECT |PARTITIONED| + select ($$260) + -- STREAM_SELECT |PARTITIONED| + project ([$$549, $$251, $$260]) + -- STREAM_PROJECT |PARTITIONED| + subplan { + aggregate [$$260] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- AGGREGATE |LOCAL| + select (and(eq($$484, "iap"), not(is-unknown($$483)))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- STREAM_SELECT |LOCAL| + assign [$$484, $$483] <- [$$item2.getField("ua"), $$item2.getField("marop")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- ASSIGN |LOCAL| + unnest $$item2 <- scan-collection($$482) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- UNNEST |LOCAL| + nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- SUBPLAN |PARTITIONED| + project ([$$549, $$251, $$482]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$251, $$482] <- [get-item(split($$item.getField("name"), " "), 0), $$item.getField("value")] + -- ASSIGN |PARTITIONED| + project ([$$549, $$item]) + -- STREAM_PROJECT |PARTITIONED| + unnest $$item <- scan-collection(object-pairs($$545)) + -- UNNEST |PARTITIONED| + project ([$$549, $$545]) + -- STREAM_PROJECT |PARTITIONED| + select (and(not(and(eq(len($$598), 1), eq(get-item($$598, 0), "some_str"))), eq($$546.getField("t"), "lovStat"))) + -- STREAM_SELECT |PARTITIONED| + assign [$$598] <- [object-names($$545)] + -- ASSIGN |PARTITIONED| + assign [$$545] <- [$$546.getField("ihsaa")] + -- ASSIGN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + data-scan []<-[$$549, $$546] <- test.col2 + -- DATASOURCE_SCAN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + empty-tuple-source + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + exchange + -- BROADCAST_EXCHANGE |PARTITIONED| + unnest $$469 <- scan-collection(array: [ "X", "Y" ]) + -- UNNEST |UNPARTITIONED| + empty-tuple-source + -- EMPTY_TUPLE_SOURCE |UNPARTITIONED| + exchange + -- BROADCAST_EXCHANGE |PARTITIONED| + unnest $$470 <- scan-collection(array: [ "X", "Y" ]) + -- UNNEST |UNPARTITIONED| + empty-tuple-source + -- EMPTY_TUPLE_SOURCE |UNPARTITIONED| + exchange + -- HASH_PARTITION_EXCHANGE [$$uniqueID] |PARTITIONED| + project ([$$466, $$uniqueID]) + -- STREAM_PROJECT |PARTITIONED| + unnest $$uniqueID <- scan-collection($$505) + -- UNNEST |PARTITIONED| + project ([$$466, $$505]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$505] <- [array-star($$p.getField("members")).getField("uniqueID")] + -- ASSIGN |PARTITIONED| + select (and(eq($$p.getField("r"), "uuu"), eq($$p.getField("t"), "ccc"), not(is-missing($$466)))) + -- STREAM_SELECT |PARTITIONED| + assign [$$466] <- [$$p.getField("cKey")] + -- ASSIGN |PARTITIONED| + project ([$$p]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$p] <- [$$p2] + -- ASSIGN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + replicate + -- REPLICATE |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + project ([$$p2]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + data-scan []<-[$$459, $$p2] <- test.col1 + -- DATASOURCE_SCAN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + empty-tuple-source + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + exchange + -- HASH_PARTITION_EXCHANGE [$$601, $$602] |PARTITIONED| + project ([$$603, $$604, $$513, $$602, $$509, $$388, $$601]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$388] <- [{"join_key1": $$601, "wpdi": $$509, "wsc": $$513, "scKey": $$602, "wwlps": $$381, "wsd": $$603, "wsm": $$604}] + -- ASSIGN |PARTITIONED| + project ([$$513, $$509, $$381, $$604, $$603, $$602, $$601]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$604, $$603, $$602, $$601] <- [substring($$516, 0, 7), substring($$516, 0, 10), uppercase(substring($$513, 0, position($$513, "_"))), substring($$511, 0, 38)] + -- ASSIGN |PARTITIONED| + project ([$$513, $$509, $$516, $$511, $$381]) + -- STREAM_PROJECT |PARTITIONED| + subplan { + aggregate [$$381] <- [listify($$380)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- AGGREGATE |LOCAL| + assign [$$380] <- [$$item.getField("wkps")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- ASSIGN |LOCAL| + unnest $$item <- scan-collection($$510) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- UNNEST |LOCAL| + nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- SUBPLAN |PARTITIONED| + project ([$$513, $$509, $$516, $$511, $$510]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + join (eq($$473, $$353)) + -- HYBRID_HASH_JOIN [$$353][$$473] |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + project ([$$513, $$509, $$516, $$511, $$510, $$353]) + -- STREAM_PROJECT |PARTITIONED| + select (and(eq($$p2.getField("t"), "ak1"), not(is-missing($$509)), not(is-missing($$541)))) + -- STREAM_SELECT |PARTITIONED| + assign [$$516, $$513, $$511, $$510, $$509] <- [$$541.getField("ssatt"), $$541.getField("stchk"), $$p2.getField("ed"), $$541.getField("intib"), $$541.getField("coaed")] + -- ASSIGN |PARTITIONED| + assign [$$353, $$541] <- [$$p2.getField("ha"), $$p2.getField("kat")] + -- ASSIGN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + replicate + -- REPLICATE |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + project ([$$p2]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + data-scan []<-[$$459, $$p2] <- test.col1 + -- DATASOURCE_SCAN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + empty-tuple-source + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + exchange + -- BROADCAST_EXCHANGE |PARTITIONED| + unnest $$473 <- scan-collection(array: [ "l1", "l2" ]) + -- UNNEST |UNPARTITIONED| + empty-tuple-source + -- EMPTY_TUPLE_SOURCE |UNPARTITIONED| + exchange + -- HASH_PARTITION_EXCHANGE [$$476] |PARTITIONED| + project ([$$525, $$526, $$476]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$526, $$525] <- [$$454.getField("omaa"), $$454.getField("qaz")] + -- ASSIGN |PARTITIONED| + select (not(is-missing($$454))) + -- STREAM_SELECT |PARTITIONED| + project ([$$476, $$454]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$476, $$454] <- [$$sbr.getField("ed"), $$sbr.getField("tohac")] + -- ASSIGN |PARTITIONED| + project ([$$sbr]) + -- STREAM_PROJECT |PARTITIONED| + assign [$$sbr] <- [$$p2] + -- ASSIGN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + replicate + -- REPLICATE |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + project ([$$p2]) + -- STREAM_PROJECT |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + data-scan []<-[$$459, $$p2] <- test.col1 + -- DATASOURCE_SCAN |PARTITIONED| + exchange + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + empty-tuple-source + -- EMPTY_TUPLE_SOURCE |PARTITIONED| \ No newline at end of file
