Better push version

Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/f0806225
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/f0806225
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/f0806225

Branch: refs/heads/resultsFinalVersion
Commit: f080622527985317232140b551afd84cb205dc96
Parents: 18d4392
Author: Steven Glenn Jacobs <sjaco...@ucr.edu>
Authored: Thu Aug 2 14:14:37 2018 -0700
Committer: Steven Glenn Jacobs <sjaco...@ucr.edu>
Committed: Thu Aug 2 14:14:37 2018 -0700

----------------------------------------------------------------------
 .../lang/statement/CreateChannelStatement.java  | 11 ++--
 .../InsertBrokerNotifierForChannelRule.java     | 63 +++++++++++++-------
 .../bad/runtime/NotifyBrokerRuntime.java        |  2 -
 3 files changed, 46 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/f0806225/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 76e3e82..9a2c6b1 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -257,9 +257,9 @@ public class CreateChannelStatement extends 
ExtensionStatement {
      *
      * push version:
      * SET inline_with "false";
-     * select value {"payload": {"result":r, "subscriptionId": brokerSubId}} 
from (
+     * select value {"payload": {"result":result, "subscriptionId": 
brokerSubId}} from (
      * with channelExecutionTime as current_datetime()
-     * select b.BrokerEndPoint, result as r, bs.brokerSubId as brokerSubId, 
channelExecutionTime
+     * select b.BrokerEndPoint, result, bs.brokerSubId as brokerSubId, 
channelExecutionTime
      * from steven.EmergencyChannelChannelSubscriptions sub,
      * steven.RecentEmergenciesNearUser(sub.param0) result,
      * steven.EmergencyChannelBrokerSubscriptions bs,
@@ -291,7 +291,7 @@ public class CreateChannelStatement extends 
ExtensionStatement {
         } else {
             builder.append(
                     "select value {\"payload\": {\"" + FUNCTION_RESULT_VAR + 
"\":" + FUNCTION_RESULT_VAR
-                            + ", \"subscriptionIds\": brokerSubIds}} from 
(\n");
+                            + ", \"subscriptionId\": brokerSubId}} from (\n");
             builder.append("with " + BADConstants.ChannelExecutionTime + " as 
current_datetime() \n");
             builder.append(
                     "select " + BROKER_RECORD_VAR + "." + 
BADConstants.BrokerEndPoint + ", " + FUNCTION_RESULT_VAR
@@ -334,10 +334,7 @@ public class CreateChannelStatement extends 
ExtensionStatement {
             builder.append(" returning " + INSERTED_RECORD_VAR);
 
         } else {
-            builder.append(") results\n");
-            builder.append("group by " + BADConstants.BrokerEndPoint + "," + 
FUNCTION_RESULT_VAR + ", "
-                    + BADConstants.ChannelExecutionTime);
-            builder.append(" group as brokerSubIds (" + 
BADConstants.BrokerSubscriptionId + " as subscriptionId)");
+            builder.append(") results");
         }
 
         builder.append(";");

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/f0806225/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 18cd41a..2596421 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -92,9 +92,9 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
         String channelName;
         AssignOperator pushAssign = null;
         AssignOperator newAssign = null;
-        GroupByOperator pushGroupBy = null;
+        UnnestOperator pushUnnest = null;
         LogicalVariable brokerSubsVar = null;
-        LogicalVariable brokerEndpoint = null;
+        LogicalVariable brokerEndpoint = context.newVar();
         LogicalVariable brokerSubId = null;
 
         if (!push) {
@@ -138,12 +138,6 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
             }
             pushAssign = (AssignOperator) op2;
 
-            AbstractLogicalOperator op3 = (AbstractLogicalOperator) 
pushAssign.getInputs().get(0).getValue();
-            if (op3.getOperatorTag() != LogicalOperatorTag.GROUP) {
-                return false;
-            }
-            pushGroupBy = (GroupByOperator) op3;
-
             //if push, get the channel name here instead
             subscriptionsScan = (DataSourceScanOperator) findOp(op, 
LogicalOperatorTag.DATASOURCESCAN, "",
                     BADConstants.ChannelSubscriptionsType);
@@ -154,7 +148,13 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
             String datasetName = dds.getDataset().getDatasetName();
             channelDataverse = dds.getDataset().getDataverseName();
             channelName = datasetName.substring(0, datasetName.length() - 13);
-            brokerEndpoint = pushGroupBy.getGroupByList().get(0).first;
+
+            AbstractLogicalOperator unnest = (AbstractLogicalOperator) 
pushAssign.getInputs().get(0).getValue();
+            if (unnest.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+                return false;
+            }
+
+            pushUnnest = (UnnestOperator) unnest;
         }
 
         //The channelExecutionTime is created just before the scan
@@ -173,7 +173,6 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
             if (subplanOperator == null) {
                 return false;
             }
-            brokerEndpoint = context.newVar();
             brokerSubId = context.newVar();
             brokerSubsVar = ((AggregateOperator) 
subplanOperator.getNestedPlans().get(0).getRoots().get(0).getValue())
                     .getVariables().get(0);
@@ -181,19 +180,18 @@ public class InsertBrokerNotifierForChannelRule 
implements IAlgebraicRewriteRule
             newAssign = createAssignsAndUnnest(brokerSubsVar, brokerEndpoint, 
brokerSubId, op, context);
 
             context.computeAndSetTypeEnvironmentForOperator(op1);
-        } else {
-            channelExecutionVar = pushGroupBy.getGroupByList().get(2).first;
+
+            //Maintain the variables through the existing project
+            ProjectOperator badProject = (ProjectOperator) findOp(op1, 
LogicalOperatorTag.PROJECT, "", "");
+            badProject.getVariables().add(channelExecutionVar);
+            badProject.getVariables().add(brokerSubsVar);
+            context.computeAndSetTypeEnvironmentForOperator(badProject);
         }
-        //Maintain the variables through the existing project
-        ProjectOperator badProject = (ProjectOperator) findOp(op1, 
LogicalOperatorTag.PROJECT, "", "");
-        badProject.getVariables().add(channelExecutionVar);
-        badProject.getVariables().add(push ? brokerEndpoint : brokerSubsVar);
-        context.computeAndSetTypeEnvironmentForOperator(badProject);
 
         //Create my brokerNotify plan above the extension Operator
         DelegateOperator dOp = push
                 ? createNotifyBrokerPushPlan(brokerEndpoint, 
channelExecutionVar, context, pushAssign, channelDataverse,
-                        channelName)
+                        channelName, pushUnnest)
                 : createNotifyBrokerPullPlan(brokerEndpoint, brokerSubId, 
channelExecutionVar, context, newAssign,
                         channelDataverse, channelName);
 
@@ -262,7 +260,7 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
 
     private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable 
brokerEndpointVar,
             LogicalVariable channelExecutionVar, IOptimizationContext context, 
AssignOperator payLoadAssign,
-            String channelDataverse, String channelName)
+            String channelDataverse, String channelName, UnnestOperator 
pushUnnest)
             throws AlgebricksException {
         IVariableTypeEnvironment env = 
payLoadAssign.computeOutputTypeEnvironment(context);
         IAType resultType = (IAType) 
env.getVarType(payLoadAssign.getVariables().get(0));
@@ -273,6 +271,21 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
                 channelName, true, resultType);
 
         extensionOp.getInputs().add(new MutableObject<>(payLoadAssign));
+
+        FunctionInfo finfoGetField = (FunctionInfo) 
FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
+        ScalarFunctionCallExpression getBrokerEndPoint = new 
ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new 
VariableReferenceExpression(pushUnnest.getVariable())), new MutableObject<>(
+                        new ConstantExpression(new AsterixConstantValue(new 
AString(BADConstants.BrokerEndPoint)))));
+
+        AssignOperator assignBrokerEndPoint =
+                new AssignOperator(brokerEndpointVar, new 
MutableObject<>(getBrokerEndPoint));
+
+        assignBrokerEndPoint.getInputs().addAll(payLoadAssign.getInputs());
+        payLoadAssign.getInputs().clear();
+        payLoadAssign.getInputs().add(new 
MutableObject<>(assignBrokerEndPoint));
+
+        context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint);
+        context.computeAndSetTypeEnvironmentForOperator(payLoadAssign);
         context.computeAndSetTypeEnvironmentForOperator(extensionOp);
 
         return extensionOp;
@@ -376,9 +389,17 @@ public class InsertBrokerNotifierForChannelRule implements 
IAlgebraicRewriteRule
                 } else if (searchTag == LogicalOperatorTag.PROJECT) {
                     return (AbstractLogicalOperator) subOp.getValue();
 
-                } else if (searchTag == LogicalOperatorTag.DATASOURCESCAN && 
isSubscriptionsScan(
+                } else if (searchTag == LogicalOperatorTag.DATASOURCESCAN) {
+                    if (isSubscriptionsScan(
                         (AbstractLogicalOperator) subOp.getValue(), 
subscriptionsName, subscriptionType)) {
-                    return (AbstractLogicalOperator) subOp.getValue();
+                        return (AbstractLogicalOperator) subOp.getValue();
+                    } else {
+                        AbstractLogicalOperator nestedOp = 
findOp((AbstractLogicalOperator) subOp.getValue(), searchTag,
+                                subscriptionsName, subscriptionType);
+                        if (nestedOp != null) {
+                            return nestedOp;
+                        }
+                    }
 
                 } else if (searchTag == LogicalOperatorTag.DELEGATE_OPERATOR) {
                     DelegateOperator dOp = (DelegateOperator) subOp.getValue();

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/f0806225/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
----------------------------------------------------------------------
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
index 8e07af2..b604f75 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -190,8 +190,6 @@ public class NotifyBrokerRuntime extends 
AbstractOneInputOneOutputOneFramePushRu
             }
 
             if (push) {
-                int pushOffset = inputArg1.getStartOffset();
-                bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), 
pushOffset + 1);
                 if (!firstResult) {
                     sendStreams.get(endpoint).append(',');
                 }

Reply via email to