Delay result type evaluation until creating physical operator Change-Id: Iabc5288b6cc5d4cdfa2ffdaca27e8caa813dc6d6
Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/9d049997 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/9d049997 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/9d049997 Branch: refs/heads/master Commit: 9d0499973681f6516eedaa4ea0370dbe4d415003 Parents: 51819b7 Author: Steven Glenn Jacobs <[email protected]> Authored: Fri Aug 3 12:49:32 2018 -0700 Committer: Steven Glenn Jacobs <[email protected]> Committed: Fri Aug 3 12:49:32 2018 -0700 ---------------------------------------------------------------------- .../bad/rules/InsertBrokerNotifierForChannelRule.java | 13 ++++--------- .../asterix/bad/runtime/NotifyBrokerOperator.java | 11 ++--------- .../asterix/bad/runtime/NotifyBrokerPOperator.java | 6 +++++- 3 files changed, 11 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/9d049997/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java index 9ead7f0..258aaf1 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java @@ -31,7 +31,6 @@ import org.apache.asterix.metadata.declared.DatasetDataSource; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.constants.AsterixConstantValue; import org.apache.asterix.om.functions.BuiltinFunctions; -import org.apache.asterix.om.types.IAType; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -44,7 +43,6 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; -import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; @@ -182,10 +180,9 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule } private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar, - LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push, - IAType resultType) { + LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push) { NotifyBrokerOperator notifyBrokerOp = - new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push, resultType); + new NotifyBrokerOperator(brokerEndpointVar, sendVar, channelExecutionVar, push); EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName); NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId); notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp); @@ -203,12 +200,10 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) { assign = (AbstractLogicalOperator) assign.getInputs().get(0).getValue(); } - IVariableTypeEnvironment env = assign.computeOutputTypeEnvironment(context); - IAType resultType = (IAType) env.getVarType(sendVar); //Create the NotifyBrokerOperator DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, channelDataverse, - channelName, true, resultType); + channelName, true); extensionOp.getInputs().add(new MutableObject<>(eOp)); context.computeAndSetTypeEnvironmentForOperator(extensionOp); @@ -260,7 +255,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule //Create the NotifyBrokerOperator DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendListVar, channelExecutionVar, - channelDataverse, channelName, false, null); + channelDataverse, channelName, false); //Set the input for the distinct as the old top extensionOp.getInputs().add(new MutableObject<>(groupbyOp)); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/9d049997/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java index df0f0f4..5fc9b22 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java @@ -20,7 +20,6 @@ package org.apache.asterix.bad.runtime; import java.util.Collection; -import org.apache.asterix.om.types.IAType; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator; @@ -35,15 +34,13 @@ public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { private final LogicalVariable channelExecutionVar; private final LogicalVariable pushListVar; private final boolean push; - private final IAType recordType; public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable pushListVar, - LogicalVariable resultSetVar, boolean push, IAType recordType) { + LogicalVariable resultSetVar, boolean push) { this.brokerEndpointVar = brokerEndpointVar; this.channelExecutionVar = resultSetVar; this.pushListVar = pushListVar; this.push = push; - this.recordType = recordType; } public LogicalVariable getPushListVar() { @@ -58,10 +55,6 @@ public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { return channelExecutionVar; } - public IAType getRecordType() { - return recordType; - } - public boolean getPush() { return push; } @@ -79,7 +72,7 @@ public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { @Override public IOperatorDelegate newInstance() { - return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push, recordType); + return new NotifyBrokerOperator(brokerEndpointVar, pushListVar, channelExecutionVar, push); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/9d049997/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java index b9cfbfd..264a994 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java @@ -27,6 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; @@ -78,7 +79,10 @@ public class NotifyBrokerPOperator extends AbstractPhysicalOperator { LogicalVariable pushListVar = ((NotifyBrokerOperator) notify.getDelegate()).getPushListVar(); LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable(); LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable(); - IAType recordType = ((NotifyBrokerOperator) notify.getDelegate()).getRecordType(); + + IVariableTypeEnvironment env = context.getTypeEnvironment(op.getInputs().get(0).getValue()); + IAType recordType = (IAType) env.getVarType(pushListVar); + boolean push = ((NotifyBrokerOperator) notify.getDelegate()).getPush(); int brokerColumn = inputSchemas[0].findVariable(brokerVar);
