Better push version
Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/f0806225 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/f0806225 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/f0806225 Branch: refs/heads/resultsFinalVersion Commit: f080622527985317232140b551afd84cb205dc96 Parents: 18d4392 Author: Steven Glenn Jacobs <sjaco...@ucr.edu> Authored: Thu Aug 2 14:14:37 2018 -0700 Committer: Steven Glenn Jacobs <sjaco...@ucr.edu> Committed: Thu Aug 2 14:14:37 2018 -0700 ---------------------------------------------------------------------- .../lang/statement/CreateChannelStatement.java | 11 ++-- .../InsertBrokerNotifierForChannelRule.java | 63 +++++++++++++------- .../bad/runtime/NotifyBrokerRuntime.java | 2 - 3 files changed, 46 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/f0806225/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index 76e3e82..9a2c6b1 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -257,9 +257,9 @@ public class CreateChannelStatement extends ExtensionStatement { * * push version: * SET inline_with "false"; - * select value {"payload": {"result":r, "subscriptionId": brokerSubId}} from ( + * select value {"payload": {"result":result, "subscriptionId": brokerSubId}} from ( * with channelExecutionTime as current_datetime() - * select b.BrokerEndPoint, result as r, bs.brokerSubId as brokerSubId, channelExecutionTime + * select b.BrokerEndPoint, result, bs.brokerSubId as brokerSubId, channelExecutionTime * from steven.EmergencyChannelChannelSubscriptions sub, * steven.RecentEmergenciesNearUser(sub.param0) result, * steven.EmergencyChannelBrokerSubscriptions bs, @@ -291,7 +291,7 @@ public class CreateChannelStatement extends ExtensionStatement { } else { builder.append( "select value {\"payload\": {\"" + FUNCTION_RESULT_VAR + "\":" + FUNCTION_RESULT_VAR - + ", \"subscriptionIds\": brokerSubIds}} from (\n"); + + ", \"subscriptionId\": brokerSubId}} from (\n"); builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n"); builder.append( "select " + BROKER_RECORD_VAR + "." + BADConstants.BrokerEndPoint + ", " + FUNCTION_RESULT_VAR @@ -334,10 +334,7 @@ public class CreateChannelStatement extends ExtensionStatement { builder.append(" returning " + INSERTED_RECORD_VAR); } else { - builder.append(") results\n"); - builder.append("group by " + BADConstants.BrokerEndPoint + "," + FUNCTION_RESULT_VAR + ", " - + BADConstants.ChannelExecutionTime); - builder.append(" group as brokerSubIds (" + BADConstants.BrokerSubscriptionId + " as subscriptionId)"); + builder.append(") results"); } builder.append(";"); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/f0806225/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java index 18cd41a..2596421 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java @@ -92,9 +92,9 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule String channelName; AssignOperator pushAssign = null; AssignOperator newAssign = null; - GroupByOperator pushGroupBy = null; + UnnestOperator pushUnnest = null; LogicalVariable brokerSubsVar = null; - LogicalVariable brokerEndpoint = null; + LogicalVariable brokerEndpoint = context.newVar(); LogicalVariable brokerSubId = null; if (!push) { @@ -138,12 +138,6 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule } pushAssign = (AssignOperator) op2; - AbstractLogicalOperator op3 = (AbstractLogicalOperator) pushAssign.getInputs().get(0).getValue(); - if (op3.getOperatorTag() != LogicalOperatorTag.GROUP) { - return false; - } - pushGroupBy = (GroupByOperator) op3; - //if push, get the channel name here instead subscriptionsScan = (DataSourceScanOperator) findOp(op, LogicalOperatorTag.DATASOURCESCAN, "", BADConstants.ChannelSubscriptionsType); @@ -154,7 +148,13 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule String datasetName = dds.getDataset().getDatasetName(); channelDataverse = dds.getDataset().getDataverseName(); channelName = datasetName.substring(0, datasetName.length() - 13); - brokerEndpoint = pushGroupBy.getGroupByList().get(0).first; + + AbstractLogicalOperator unnest = (AbstractLogicalOperator) pushAssign.getInputs().get(0).getValue(); + if (unnest.getOperatorTag() != LogicalOperatorTag.UNNEST) { + return false; + } + + pushUnnest = (UnnestOperator) unnest; } //The channelExecutionTime is created just before the scan @@ -173,7 +173,6 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule if (subplanOperator == null) { return false; } - brokerEndpoint = context.newVar(); brokerSubId = context.newVar(); brokerSubsVar = ((AggregateOperator) subplanOperator.getNestedPlans().get(0).getRoots().get(0).getValue()) .getVariables().get(0); @@ -181,19 +180,18 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule newAssign = createAssignsAndUnnest(brokerSubsVar, brokerEndpoint, brokerSubId, op, context); context.computeAndSetTypeEnvironmentForOperator(op1); - } else { - channelExecutionVar = pushGroupBy.getGroupByList().get(2).first; + + //Maintain the variables through the existing project + ProjectOperator badProject = (ProjectOperator) findOp(op1, LogicalOperatorTag.PROJECT, "", ""); + badProject.getVariables().add(channelExecutionVar); + badProject.getVariables().add(brokerSubsVar); + context.computeAndSetTypeEnvironmentForOperator(badProject); } - //Maintain the variables through the existing project - ProjectOperator badProject = (ProjectOperator) findOp(op1, LogicalOperatorTag.PROJECT, "", ""); - badProject.getVariables().add(channelExecutionVar); - badProject.getVariables().add(push ? brokerEndpoint : brokerSubsVar); - context.computeAndSetTypeEnvironmentForOperator(badProject); //Create my brokerNotify plan above the extension Operator DelegateOperator dOp = push ? createNotifyBrokerPushPlan(brokerEndpoint, channelExecutionVar, context, pushAssign, channelDataverse, - channelName) + channelName, pushUnnest) : createNotifyBrokerPullPlan(brokerEndpoint, brokerSubId, channelExecutionVar, context, newAssign, channelDataverse, channelName); @@ -262,7 +260,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar, LogicalVariable channelExecutionVar, IOptimizationContext context, AssignOperator payLoadAssign, - String channelDataverse, String channelName) + String channelDataverse, String channelName, UnnestOperator pushUnnest) throws AlgebricksException { IVariableTypeEnvironment env = payLoadAssign.computeOutputTypeEnvironment(context); IAType resultType = (IAType) env.getVarType(payLoadAssign.getVariables().get(0)); @@ -273,6 +271,21 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule channelName, true, resultType); extensionOp.getInputs().add(new MutableObject<>(payLoadAssign)); + + FunctionInfo finfoGetField = (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME); + ScalarFunctionCallExpression getBrokerEndPoint = new ScalarFunctionCallExpression(finfoGetField, + new MutableObject<>(new VariableReferenceExpression(pushUnnest.getVariable())), new MutableObject<>( + new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))))); + + AssignOperator assignBrokerEndPoint = + new AssignOperator(brokerEndpointVar, new MutableObject<>(getBrokerEndPoint)); + + assignBrokerEndPoint.getInputs().addAll(payLoadAssign.getInputs()); + payLoadAssign.getInputs().clear(); + payLoadAssign.getInputs().add(new MutableObject<>(assignBrokerEndPoint)); + + context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint); + context.computeAndSetTypeEnvironmentForOperator(payLoadAssign); context.computeAndSetTypeEnvironmentForOperator(extensionOp); return extensionOp; @@ -376,9 +389,17 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule } else if (searchTag == LogicalOperatorTag.PROJECT) { return (AbstractLogicalOperator) subOp.getValue(); - } else if (searchTag == LogicalOperatorTag.DATASOURCESCAN && isSubscriptionsScan( + } else if (searchTag == LogicalOperatorTag.DATASOURCESCAN) { + if (isSubscriptionsScan( (AbstractLogicalOperator) subOp.getValue(), subscriptionsName, subscriptionType)) { - return (AbstractLogicalOperator) subOp.getValue(); + return (AbstractLogicalOperator) subOp.getValue(); + } else { + AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), searchTag, + subscriptionsName, subscriptionType); + if (nestedOp != null) { + return nestedOp; + } + } } else if (searchTag == LogicalOperatorTag.DELEGATE_OPERATOR) { DelegateOperator dOp = (DelegateOperator) subOp.getValue(); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/f0806225/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java index 8e07af2..b604f75 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java @@ -190,8 +190,6 @@ public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRu } if (push) { - int pushOffset = inputArg1.getStartOffset(); - bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1); if (!firstResult) { sendStreams.get(endpoint).append(','); }