This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9b3b5fadf0 [ASTERIXDB-3287][COMP] Introduce write operator
9b3b5fadf0 is described below
commit 9b3b5fadf0a6c646631a3e0757520af149c9bcf5
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Tue Oct 31 07:03:26 2023 -0700
[ASTERIXDB-3287][COMP] Introduce write operator
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Add the logical and physical write operators. We utilize
the old (and deprecated) operators.
Change-Id: Ib4fca256c6bdfa4b83890c285f509d476f130a54
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17891
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../asterix/optimizer/base/RuleCollections.java | 2 +
.../optimizer/rules/CleanupWriteOperatorRule.java | 59 +++
.../optimizer/rules/ConstantFoldingRule.java | 459 +-------------------
.../rules/visitor/ConstantFoldingVisitor.java | 479 +++++++++++++++++++++
.../metadata/declared/MetadataProvider.java | 26 +-
.../asterix/metadata/declared/WriteDataSink.java | 54 +++
.../asterix/om/utils/ConstantExpressionUtil.java | 20 +
.../core/algebra/metadata/IMetadataProvider.java | 9 +-
.../core/algebra/metadata/IWriteDataSink.java | 29 ++
.../algebra/operators/logical/WriteOperator.java | 92 +++-
.../logical/visitors/OperatorDeepCopyVisitor.java | 16 +-
.../logical/visitors/SchemaVariableVisitor.java | 2 +-
.../visitors/SubstituteVariableVisitor.java | 7 +-
.../logical/visitors/UsedVariableVisitor.java | 8 +-
.../physical/AbstractWindowPOperator.java | 47 +-
.../operators/physical/SinkWritePOperator.java | 105 +++--
.../LogicalOperatorPrettyPrintVisitor.java | 18 +-
.../LogicalOperatorPrettyPrintVisitorJson.java | 17 +-
.../core/utils/LogicalOperatorDotVisitor.java | 19 +-
.../rules/SetAlgebricksPhysicalOperatorsRule.java | 10 +-
20 files changed, 922 insertions(+), 556 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index bf86cc5e80..29984e6481 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -38,6 +38,7 @@ import
org.apache.asterix.optimizer.rules.CancelUnnestWithNestedListifyRule;
import org.apache.asterix.optimizer.rules.CheckFilterExpressionTypeRule;
import org.apache.asterix.optimizer.rules.CheckFullParallelSortRule;
import org.apache.asterix.optimizer.rules.CheckInsertUpsertReturningRule;
+import org.apache.asterix.optimizer.rules.CleanupWriteOperatorRule;
import org.apache.asterix.optimizer.rules.ConstantFoldingRule;
import org.apache.asterix.optimizer.rules.CountVarToCountOneRule;
import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
@@ -357,6 +358,7 @@ public final class RuleCollections {
// RemoveRedundantBooleanExpressionsInJoinRule has to run first to
probably eliminate the need for
// introducing an assign operator in ExtractSimilarVariablesInJoinRule
planCleanupRules.add(new ExtractRedundantVariablesInJoinRule());
+ planCleanupRules.add(new CleanupWriteOperatorRule());
// Needs to invoke ByNameToByIndexFieldAccessRule as the last logical
optimization rule because
// some rules can push a FieldAccessByName to a place where the name
it tries to access is in the closed part.
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
new file mode 100644
index 0000000000..459a29b6e7
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class CleanupWriteOperatorRule implements IAlgebraicRewriteRule {
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
+ throws AlgebricksException {
+ ILogicalOperator op = opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.WRITE) {
+ return false;
+ }
+
+ WriteOperator writeOp = (WriteOperator) op;
+ ILogicalExpression pathExpr = writeOp.getPathExpression().getValue();
+ if (pathExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return false;
+ }
+
+ boolean changed = false;
+ List<Mutable<ILogicalExpression>> partitionExprs =
writeOp.getPartitionExpressions();
+ if (!partitionExprs.isEmpty()) {
+ // Useless partition expressions due to having a constant path
expression
+ partitionExprs.clear();
+ writeOp.getOrderExpressions().clear();
+ changed = true;
+ }
+
+ return changed;
+ }
+}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 256d4818ff..343ff5dc72 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -19,145 +19,20 @@
package org.apache.asterix.optimizer.rules;
-import java.io.DataInputStream;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.WarningCollector;
-import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
-import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
-import org.apache.asterix.dataflow.data.nontagged.NullWriterFactory;
-import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFamilyProvider;
-import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.om.base.ADouble;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.constants.AsterixConstantValue;
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AbstractCollectionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
-import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.optimizer.rules.visitor.ConstantFoldingVisitor;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
-import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import
org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
-import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.util.LogRedactionUtil;
-
-import com.google.common.collect.ImmutableMap;
public class ConstantFoldingRule implements IAlgebraicRewriteRule {
- private final ConstantFoldingVisitor cfv = new ConstantFoldingVisitor();
- private final JobGenContext jobGenCtx;
-
- private static final Map<FunctionIdentifier, IAObject> FUNC_ID_TO_CONSTANT
= ImmutableMap
- .of(BuiltinFunctions.NUMERIC_E, new ADouble(Math.E),
BuiltinFunctions.NUMERIC_PI, new ADouble(Math.PI));
-
- /**
- * Throws exceptions in substituteProducedVariable, setVarType, and one
getVarType method.
- */
- private static final IVariableTypeEnvironment _emptyTypeEnv = new
IVariableTypeEnvironment() {
-
- @Override
- public boolean substituteProducedVariable(LogicalVariable v1,
LogicalVariable v2) {
- throw new IllegalStateException();
- }
-
- @Override
- public void setVarType(LogicalVariable var, Object type) {
- throw new IllegalStateException();
- }
-
- @Override
- public Object getVarType(LogicalVariable var, List<LogicalVariable>
nonMissableVariables,
- List<List<LogicalVariable>> correlatedMissableVariableLists,
List<LogicalVariable> nonNullableVariables,
- List<List<LogicalVariable>> correlatedNullableVariableLists) {
- throw new IllegalStateException();
- }
-
- @Override
- public Object getVarType(LogicalVariable var) {
- throw new IllegalStateException();
- }
-
- @Override
- public Object getType(ILogicalExpression expr) throws
AlgebricksException {
- return ExpressionTypeComputer.INSTANCE.getType(expr, null, this);
- }
- };
-
- private static final IOperatorSchema[] _emptySchemas = new
IOperatorSchema[] {};
+ private final ConstantFoldingVisitor cfv;
public ConstantFoldingRule(ICcApplicationContext appCtx) {
- MetadataProvider metadataProvider =
MetadataProvider.createWithDefaultNamespace(appCtx);
- jobGenCtx = new JobGenContext(null, metadataProvider, appCtx,
SerializerDeserializerProvider.INSTANCE,
- BinaryHashFunctionFactoryProvider.INSTANCE,
BinaryHashFunctionFamilyProvider.INSTANCE,
- BinaryComparatorFactoryProvider.INSTANCE,
TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
- BinaryIntegerInspector.FACTORY,
ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
- ResultSerializerFactoryProvider.INSTANCE,
MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
- UnnestingPositionWriterFactory.INSTANCE, null,
- new ExpressionRuntimeProvider(new
QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
- ExpressionTypeComputer.INSTANCE, null, null, null, null,
GlobalConfig.DEFAULT_FRAME_SIZE, null,
- NoOpWarningCollector.INSTANCE, 0, new
PhysicalOptimizationConfig());
+ cfv = new ConstantFoldingVisitor(appCtx);
}
@Override
@@ -176,332 +51,4 @@ public class ConstantFoldingRule implements
IAlgebraicRewriteRule {
cfv.reset(context);
return op.acceptExpressionTransform(cfv);
}
-
- private class ConstantFoldingVisitor implements
ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
- ILogicalExpressionReferenceTransform, IEvaluatorContext {
-
- private final IPointable p = VoidPointable.FACTORY.createPointable();
- private final ByteBufferInputStream bbis = new ByteBufferInputStream();
- private final DataInputStream dis = new DataInputStream(bbis);
- private final WarningCollector warningCollector = new
WarningCollector();
- private IOptimizationContext optContext;
- private IServiceContext serviceContext;
-
- private void reset(IOptimizationContext context) {
- optContext = context;
- serviceContext =
- ((MetadataProvider)
context.getMetadataProvider()).getApplicationContext().getServiceContext();
- }
-
- @Override
- public boolean transform(Mutable<ILogicalExpression> exprRef) throws
AlgebricksException {
- AbstractLogicalExpression expr = (AbstractLogicalExpression)
exprRef.getValue();
- Pair<Boolean, ILogicalExpression> newExpression =
expr.accept(this, null);
- if (newExpression.first) {
- exprRef.setValue(newExpression.second);
- }
- return newExpression.first;
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression>
visitConstantExpression(ConstantExpression expr, Void arg) {
- return new Pair<>(false, expr);
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression>
visitVariableReferenceExpression(VariableReferenceExpression expr,
- Void arg) {
- return new Pair<>(false, expr);
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression>
visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
- Void arg) throws AlgebricksException {
- boolean changed = constantFoldArgs(expr, arg);
- List<Mutable<ILogicalExpression>> argList = expr.getArguments();
- int argConstantCount = countConstantArgs(argList);
- FunctionIdentifier fid = expr.getFunctionIdentifier();
- if (argConstantCount != argList.size()) {
- if (argConstantCount > 0 && (BuiltinFunctions.OR.equals(fid)
|| BuiltinFunctions.AND.equals(fid))) {
- if (foldOrAndArgs(expr)) {
- ILogicalExpression changedExpr =
- expr.getArguments().size() == 1 ?
expr.getArguments().get(0).getValue() : expr;
- return new Pair<>(true, changedExpr);
- }
- }
- return new Pair<>(changed, expr);
- }
-
- if (!expr.isFunctional() || !canConstantFold(expr)) {
- return new Pair<>(changed, expr);
- }
-
- try {
- if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
- IAType argType = (IAType)
_emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
- if (argType.getTypeTag() == ATypeTag.OBJECT) {
- ARecordType rt = (ARecordType) argType;
- String str =
ConstantExpressionUtil.getStringConstant(expr.getArguments().get(1).getValue());
- int k = rt.getFieldIndex(str);
- if (k >= 0) {
- // wait for the ByNameToByIndex rule to apply
- return new Pair<>(changed, expr);
- }
- }
- }
- IAObject c = FUNC_ID_TO_CONSTANT.get(fid);
- if (c != null) {
- ConstantExpression constantExpression = new
ConstantExpression(new AsterixConstantValue(c));
-
constantExpression.setSourceLocation(expr.getSourceLocation());
- return new Pair<>(true, constantExpression);
- }
-
- IScalarEvaluatorFactory fact =
jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
- _emptyTypeEnv, _emptySchemas, jobGenCtx);
-
- warningCollector.clear();
- IScalarEvaluator eval = fact.createScalarEvaluator(this);
- eval.evaluate(null, p);
- IAType returnType = (IAType) _emptyTypeEnv.getType(expr);
- ATypeTag runtimeType = PointableHelper.getTypeTag(p);
- if (runtimeType.isDerivedType()) {
- returnType = TypeComputeUtils.getActualType(returnType);
- } else {
- returnType = TypeTagUtil.getBuiltinTypeByTag(runtimeType);
- }
- @SuppressWarnings("rawtypes")
- ISerializerDeserializer serde =
-
jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(returnType);
- bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(),
p.getStartOffset(), p.getLength()), 0);
- IAObject o = (IAObject) serde.deserialize(dis);
- warningCollector.getWarnings(optContext.getWarningCollector());
- ConstantExpression constantExpression = new
ConstantExpression(new AsterixConstantValue(o));
- constantExpression.setSourceLocation(expr.getSourceLocation());
- return new Pair<>(true, constantExpression);
- } catch (HyracksDataException | AlgebricksException e) {
- if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Exception caught
at constant folding: " + e, e);
- }
- return new Pair<>(false, null);
- }
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression>
visitAggregateFunctionCallExpression(
- AggregateFunctionCallExpression expr, Void arg) throws
AlgebricksException {
- boolean changed = constantFoldArgs(expr, arg);
- return new Pair<>(changed, expr);
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression>
visitStatefulFunctionCallExpression(
- StatefulFunctionCallExpression expr, Void arg) throws
AlgebricksException {
- boolean changed = constantFoldArgs(expr, arg);
- return new Pair<>(changed, expr);
- }
-
- @Override
- public Pair<Boolean, ILogicalExpression>
visitUnnestingFunctionCallExpression(
- UnnestingFunctionCallExpression expr, Void arg) throws
AlgebricksException {
- boolean changed = constantFoldArgs(expr, arg);
- return new Pair<>(changed, expr);
- }
-
- private boolean constantFoldArgs(AbstractFunctionCallExpression expr,
Void arg) throws AlgebricksException {
- return
expr.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)
- ? foldRecordArgs(expr, arg) : foldFunctionArgs(expr, arg);
- }
-
- private boolean foldFunctionArgs(AbstractFunctionCallExpression expr,
Void arg) throws AlgebricksException {
- boolean changed = false;
- for (Mutable<ILogicalExpression> exprArgRef : expr.getArguments())
{
- changed |= foldArg(exprArgRef, arg);
- }
- return changed;
- }
-
- private boolean foldRecordArgs(AbstractFunctionCallExpression expr,
Void arg) throws AlgebricksException {
- if (expr.getArguments().size() % 2 != 0) {
- String functionName = expr.getFunctionIdentifier().getName();
- throw
CompilationException.create(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS,
expr.getSourceLocation(),
- functionName);
- }
- boolean changed = false;
- Iterator<Mutable<ILogicalExpression>> iterator =
expr.getArguments().iterator();
- int fieldNameIdx = 0;
- while (iterator.hasNext()) {
- Mutable<ILogicalExpression> fieldNameExprRef = iterator.next();
- Pair<Boolean, ILogicalExpression> fieldNameExpr =
fieldNameExprRef.getValue().accept(this, arg);
- boolean isDuplicate = false;
- if (fieldNameExpr.first) {
- String fieldName =
ConstantExpressionUtil.getStringConstant(fieldNameExpr.second);
- if (fieldName != null) {
- isDuplicate = isDuplicateField(fieldName,
fieldNameIdx, expr.getArguments());
- }
- if (isDuplicate) {
- IWarningCollector warningCollector =
optContext.getWarningCollector();
- if (warningCollector.shouldWarn()) {
-
warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
-
ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME,
LogRedactionUtil.userData(fieldName)));
- }
- iterator.remove();
- iterator.next();
- iterator.remove();
- } else {
- fieldNameExprRef.setValue(fieldNameExpr.second);
- }
- changed = true;
- }
- if (!isDuplicate) {
- Mutable<ILogicalExpression> fieldValue = iterator.next();
- changed |= foldArg(fieldValue, arg);
- fieldNameIdx += 2;
- }
- }
- return changed;
- }
-
- private boolean isDuplicateField(String fName, int fIdx,
List<Mutable<ILogicalExpression>> args) {
- for (int i = 0, size = args.size(); i < size; i += 2) {
- if (i != fIdx &&
fName.equals(ConstantExpressionUtil.getStringConstant(args.get(i).getValue())))
{
- return true;
- }
- }
- return false;
- }
-
- private boolean foldArg(Mutable<ILogicalExpression> exprArgRef, Void
arg) throws AlgebricksException {
- Pair<Boolean, ILogicalExpression> newExpr =
exprArgRef.getValue().accept(this, arg);
- if (newExpr.first) {
- exprArgRef.setValue(newExpr.second);
- return true;
- }
- return false;
- }
-
- private int countConstantArgs(List<Mutable<ILogicalExpression>>
argList) {
- int n = 0;
- for (Mutable<ILogicalExpression> r : argList) {
- if (r.getValue().getExpressionTag() ==
LogicalExpressionTag.CONSTANT) {
- n++;
- }
- }
- return n;
- }
-
- private boolean canConstantFold(ScalarFunctionCallExpression function)
throws AlgebricksException {
- // skip external functions because they're not available at
compile time (on CC)
- IFunctionInfo fi = function.getFunctionInfo();
- if (fi.isExternal()) {
- return false;
- }
- IAType returnType = (IAType) _emptyTypeEnv.getType(function);
- // skip all functions that would produce records/arrays/multisets
(derived types) in their open format
- // this is because constant folding them will make them closed
(currently)
- if
(function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR))
{
- if (returnType.getTypeTag() != ATypeTag.OBJECT ||
((ARecordType) returnType).isOpen()) {
- return false;
- }
- }
- return canConstantFoldType(returnType);
- }
-
- private boolean canConstantFoldType(IAType returnType) {
- ATypeTag tag = returnType.getTypeTag();
- if (tag == ATypeTag.ANY) {
- // if the function is to return a record (or derived data),
that record would (should) be an open record
- return false;
- } else if (tag == ATypeTag.OBJECT) {
- ARecordType recordType = (ARecordType) returnType;
- if (recordType.isOpen()) {
- return false;
- }
- IAType[] fieldTypes = recordType.getFieldTypes();
- for (int i = 0; i < fieldTypes.length; i++) {
- if (!canConstantFoldType(fieldTypes[i])) {
- return false;
- }
- }
- } else if (tag.isListType()) {
- AbstractCollectionType listType = (AbstractCollectionType)
returnType;
- return canConstantFoldType(listType.getItemType());
- } else if (tag == ATypeTag.UNION) {
- return canConstantFoldType(((AUnionType)
returnType).getActualType());
- }
- return true;
- }
-
- private boolean foldOrAndArgs(ScalarFunctionCallExpression expr) {
- // or(true,x,y) -> true; or(false,x,y) -> or(x,y)
- boolean changed = false;
- List<Mutable<ILogicalExpression>> argList = expr.getArguments();
- Iterator<Mutable<ILogicalExpression>> argIter = argList.iterator();
- Mutable<ILogicalExpression> argFalse = null;
- while (argIter.hasNext()) {
- Mutable<ILogicalExpression> argExprRef = argIter.next();
- ILogicalExpression argExpr = argExprRef.getValue();
- if (argExpr.getExpressionTag() !=
LogicalExpressionTag.CONSTANT) {
- continue;
- }
-
- ConstantExpression cExpr = (ConstantExpression) argExpr;
- IAlgebricksConstantValue cValue = cExpr.getValue();
- FunctionIdentifier fid = expr.getFunctionIdentifier();
-
- if (replaceAndReturn(cValue, fid)) {
- // or(true,x,y) -> true;
- // and(false, x, y) -> false
- argList.clear();
- argList.add(argExprRef);
- return true;
- } else if (removeAndContinue(cValue, fid)) {
- // or(false, x, y) -> or(x, y)
- // and(true, x, y) -> and(x, y)
- // remove 'false' (or 'true') from arg list, but save the
expression.
- argFalse = argExprRef;
- argIter.remove();
- changed = true;
- }
- }
- if (argList.isEmpty() && argFalse != null) {
- argList.add(argFalse);
- }
- return changed;
- }
-
- private boolean replaceAndReturn(IAlgebricksConstantValue cValue,
FunctionIdentifier fid) {
- if (BuiltinFunctions.OR.equals(fid)) {
- return cValue.isTrue();
- } else {
- // BuiltinFunctions.AND
- return cValue.isFalse();
- }
- }
-
- private boolean removeAndContinue(IAlgebricksConstantValue cValue,
FunctionIdentifier fid) {
- if (BuiltinFunctions.OR.equals(fid)) {
- return cValue.isFalse();
- } else {
- // BuiltinFunctions.AND
- return cValue.isTrue();
- }
- }
-
- // IEvaluatorContext
-
- @Override
- public IServiceContext getServiceContext() {
- return serviceContext;
- }
-
- @Override
- public IHyracksTaskContext getTaskContext() {
- return null;
- }
-
- @Override
- public IWarningCollector getWarningCollector() {
- return warningCollector;
- }
- }
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
new file mode 100644
index 0000000000..0feb6c526c
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.visitor;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningCollector;
+import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
+import org.apache.asterix.dataflow.data.nontagged.NullWriterFactory;
+import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFamilyProvider;
+import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import
org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.util.LogRedactionUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+public class ConstantFoldingVisitor implements
ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
+ ILogicalExpressionReferenceTransform, IEvaluatorContext {
+
+ /**
+ * Throws exceptions in substituteProducedVariable, setVarType, and one
getVarType method.
+ */
+ private static final IVariableTypeEnvironment _emptyTypeEnv = new
IVariableTypeEnvironment() {
+
+ @Override
+ public boolean substituteProducedVariable(LogicalVariable v1,
LogicalVariable v2) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void setVarType(LogicalVariable var, Object type) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public Object getVarType(LogicalVariable var, List<LogicalVariable>
nonMissableVariables,
+ List<List<LogicalVariable>> correlatedMissableVariableLists,
List<LogicalVariable> nonNullableVariables,
+ List<List<LogicalVariable>> correlatedNullableVariableLists) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public Object getVarType(LogicalVariable var) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public Object getType(ILogicalExpression expr) throws
AlgebricksException {
+ return ExpressionTypeComputer.INSTANCE.getType(expr, null, this);
+ }
+ };
+
+ private static final IOperatorSchema[] _emptySchemas = new
IOperatorSchema[] {};
+ private static final Map<FunctionIdentifier, IAObject> FUNC_ID_TO_CONSTANT
= ImmutableMap
+ .of(BuiltinFunctions.NUMERIC_E, new ADouble(Math.E),
BuiltinFunctions.NUMERIC_PI, new ADouble(Math.PI));
+ private final JobGenContext jobGenCtx;
+ private final IPointable p = VoidPointable.FACTORY.createPointable();
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream dis = new DataInputStream(bbis);
+ private final WarningCollector warningCollector = new WarningCollector();
+ private IOptimizationContext optContext;
+ private IServiceContext serviceContext;
+
+ public ConstantFoldingVisitor(ICcApplicationContext appCtx) {
+ MetadataProvider metadataProvider =
MetadataProvider.createWithDefaultNamespace(appCtx);
+ jobGenCtx = new JobGenContext(null, metadataProvider, appCtx,
SerializerDeserializerProvider.INSTANCE,
+ BinaryHashFunctionFactoryProvider.INSTANCE,
BinaryHashFunctionFamilyProvider.INSTANCE,
+ BinaryComparatorFactoryProvider.INSTANCE,
TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
+ BinaryIntegerInspector.FACTORY,
ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
+ ResultSerializerFactoryProvider.INSTANCE,
MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
+ UnnestingPositionWriterFactory.INSTANCE, null,
+ new ExpressionRuntimeProvider(new
QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
+ ExpressionTypeComputer.INSTANCE, null, null, null, null,
GlobalConfig.DEFAULT_FRAME_SIZE, null,
+ NoOpWarningCollector.INSTANCE, 0, new
PhysicalOptimizationConfig());
+ }
+
+ public void reset(IOptimizationContext context) {
+ optContext = context;
+ serviceContext = ((MetadataProvider)
context.getMetadataProvider()).getApplicationContext().getServiceContext();
+ }
+
+ @Override
+ public boolean transform(Mutable<ILogicalExpression> exprRef) throws
AlgebricksException {
+ AbstractLogicalExpression expr = (AbstractLogicalExpression)
exprRef.getValue();
+ Pair<Boolean, ILogicalExpression> newExpression = expr.accept(this,
null);
+ if (newExpression.first) {
+ exprRef.setValue(newExpression.second);
+ }
+ return newExpression.first;
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression>
visitConstantExpression(ConstantExpression expr, Void arg) {
+ return new Pair<>(false, expr);
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression>
visitVariableReferenceExpression(VariableReferenceExpression expr,
+ Void arg) {
+ return new Pair<>(false, expr);
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression>
visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
+ Void arg) throws AlgebricksException {
+ boolean changed = constantFoldArgs(expr, arg);
+ List<Mutable<ILogicalExpression>> argList = expr.getArguments();
+ int argConstantCount = countConstantArgs(argList);
+ FunctionIdentifier fid = expr.getFunctionIdentifier();
+ if (argConstantCount != argList.size()) {
+ if (argConstantCount > 0 && (BuiltinFunctions.OR.equals(fid) ||
BuiltinFunctions.AND.equals(fid))) {
+ if (foldOrAndArgs(expr)) {
+ ILogicalExpression changedExpr =
+ expr.getArguments().size() == 1 ?
expr.getArguments().get(0).getValue() : expr;
+ return new Pair<>(true, changedExpr);
+ }
+ }
+ return new Pair<>(changed, expr);
+ }
+
+ if (!expr.isFunctional() || !canConstantFold(expr)) {
+ return new Pair<>(changed, expr);
+ }
+
+ try {
+ if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
+ IAType argType = (IAType)
_emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
+ if (argType.getTypeTag() == ATypeTag.OBJECT) {
+ ARecordType rt = (ARecordType) argType;
+ String str =
ConstantExpressionUtil.getStringConstant(expr.getArguments().get(1).getValue());
+ int k = rt.getFieldIndex(str);
+ if (k >= 0) {
+ // wait for the ByNameToByIndex rule to apply
+ return new Pair<>(changed, expr);
+ }
+ }
+ }
+ IAObject c = FUNC_ID_TO_CONSTANT.get(fid);
+ if (c != null) {
+ ConstantExpression constantExpression = new
ConstantExpression(new AsterixConstantValue(c));
+ constantExpression.setSourceLocation(expr.getSourceLocation());
+ return new Pair<>(true, constantExpression);
+ }
+
+ IScalarEvaluatorFactory fact =
jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
+ _emptyTypeEnv, _emptySchemas, jobGenCtx);
+
+ warningCollector.clear();
+ IScalarEvaluator eval = fact.createScalarEvaluator(this);
+ eval.evaluate(null, p);
+ IAType returnType = (IAType) _emptyTypeEnv.getType(expr);
+ ATypeTag runtimeType = PointableHelper.getTypeTag(p);
+ if (runtimeType.isDerivedType()) {
+ returnType = TypeComputeUtils.getActualType(returnType);
+ } else {
+ returnType = TypeTagUtil.getBuiltinTypeByTag(runtimeType);
+ }
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer serde =
+
jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(returnType);
+ bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(),
p.getStartOffset(), p.getLength()), 0);
+ IAObject o = (IAObject) serde.deserialize(dis);
+ warningCollector.getWarnings(optContext.getWarningCollector());
+ ConstantExpression constantExpression = new ConstantExpression(new
AsterixConstantValue(o));
+ constantExpression.setSourceLocation(expr.getSourceLocation());
+ return new Pair<>(true, constantExpression);
+ } catch (HyracksDataException | AlgebricksException e) {
+ if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Exception caught at
constant folding: " + e, e);
+ }
+ return new Pair<>(false, null);
+ }
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression>
visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr,
+ Void arg) throws AlgebricksException {
+ boolean changed = constantFoldArgs(expr, arg);
+ return new Pair<>(changed, expr);
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression>
visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr,
+ Void arg) throws AlgebricksException {
+ boolean changed = constantFoldArgs(expr, arg);
+ return new Pair<>(changed, expr);
+ }
+
+ @Override
+ public Pair<Boolean, ILogicalExpression>
visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr,
+ Void arg) throws AlgebricksException {
+ boolean changed = constantFoldArgs(expr, arg);
+ return new Pair<>(changed, expr);
+ }
+
+ private boolean constantFoldArgs(AbstractFunctionCallExpression expr, Void
arg) throws AlgebricksException {
+ return
expr.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) ?
foldRecordArgs(expr, arg)
+ : foldFunctionArgs(expr, arg);
+ }
+
+ private boolean foldFunctionArgs(AbstractFunctionCallExpression expr, Void
arg) throws AlgebricksException {
+ boolean changed = false;
+ for (Mutable<ILogicalExpression> exprArgRef : expr.getArguments()) {
+ changed |= foldArg(exprArgRef, arg);
+ }
+ return changed;
+ }
+
+ private boolean foldRecordArgs(AbstractFunctionCallExpression expr, Void
arg) throws AlgebricksException {
+ if (expr.getArguments().size() % 2 != 0) {
+ String functionName = expr.getFunctionIdentifier().getName();
+ throw
CompilationException.create(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS,
expr.getSourceLocation(),
+ functionName);
+ }
+ boolean changed = false;
+ Iterator<Mutable<ILogicalExpression>> iterator =
expr.getArguments().iterator();
+ int fieldNameIdx = 0;
+ while (iterator.hasNext()) {
+ Mutable<ILogicalExpression> fieldNameExprRef = iterator.next();
+ Pair<Boolean, ILogicalExpression> fieldNameExpr =
fieldNameExprRef.getValue().accept(this, arg);
+ boolean isDuplicate = false;
+ if (fieldNameExpr.first) {
+ String fieldName =
ConstantExpressionUtil.getStringConstant(fieldNameExpr.second);
+ if (fieldName != null) {
+ isDuplicate = isDuplicateField(fieldName, fieldNameIdx,
expr.getArguments());
+ }
+ if (isDuplicate) {
+ IWarningCollector warningCollector =
optContext.getWarningCollector();
+ if (warningCollector.shouldWarn()) {
+
warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
+ ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME,
LogRedactionUtil.userData(fieldName)));
+ }
+ iterator.remove();
+ iterator.next();
+ iterator.remove();
+ } else {
+ fieldNameExprRef.setValue(fieldNameExpr.second);
+ }
+ changed = true;
+ }
+ if (!isDuplicate) {
+ Mutable<ILogicalExpression> fieldValue = iterator.next();
+ changed |= foldArg(fieldValue, arg);
+ fieldNameIdx += 2;
+ }
+ }
+ return changed;
+ }
+
+ private boolean isDuplicateField(String fName, int fIdx,
List<Mutable<ILogicalExpression>> args) {
+ for (int i = 0, size = args.size(); i < size; i += 2) {
+ if (i != fIdx &&
fName.equals(ConstantExpressionUtil.getStringConstant(args.get(i).getValue())))
{
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean foldArg(Mutable<ILogicalExpression> exprArgRef, Void arg)
throws AlgebricksException {
+ Pair<Boolean, ILogicalExpression> newExpr =
exprArgRef.getValue().accept(this, arg);
+ if (newExpr.first) {
+ exprArgRef.setValue(newExpr.second);
+ return true;
+ }
+ return false;
+ }
+
+ private int countConstantArgs(List<Mutable<ILogicalExpression>> argList) {
+ int n = 0;
+ for (Mutable<ILogicalExpression> r : argList) {
+ if (r.getValue().getExpressionTag() ==
LogicalExpressionTag.CONSTANT) {
+ n++;
+ }
+ }
+ return n;
+ }
+
+ private boolean canConstantFold(ScalarFunctionCallExpression function)
throws AlgebricksException {
+ // skip external functions because they're not available at compile
time (on CC)
+ IFunctionInfo fi = function.getFunctionInfo();
+ if (fi.isExternal()) {
+ return false;
+ }
+ IAType returnType = (IAType) _emptyTypeEnv.getType(function);
+ // skip all functions that would produce records/arrays/multisets
(derived types) in their open format
+ // this is because constant folding them will make them closed
(currently)
+ if
(function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR))
{
+ if (returnType.getTypeTag() != ATypeTag.OBJECT || ((ARecordType)
returnType).isOpen()) {
+ return false;
+ }
+ }
+ return canConstantFoldType(returnType);
+ }
+
+ private boolean canConstantFoldType(IAType returnType) {
+ ATypeTag tag = returnType.getTypeTag();
+ if (tag == ATypeTag.ANY) {
+ // if the function is to return a record (or derived data), that
record would (should) be an open record
+ return false;
+ } else if (tag == ATypeTag.OBJECT) {
+ ARecordType recordType = (ARecordType) returnType;
+ if (recordType.isOpen()) {
+ return false;
+ }
+ IAType[] fieldTypes = recordType.getFieldTypes();
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (!canConstantFoldType(fieldTypes[i])) {
+ return false;
+ }
+ }
+ } else if (tag.isListType()) {
+ AbstractCollectionType listType = (AbstractCollectionType)
returnType;
+ return canConstantFoldType(listType.getItemType());
+ } else if (tag == ATypeTag.UNION) {
+ return canConstantFoldType(((AUnionType)
returnType).getActualType());
+ }
+ return true;
+ }
+
+ private boolean foldOrAndArgs(ScalarFunctionCallExpression expr) {
+ // or(true,x,y) -> true; or(false,x,y) -> or(x,y)
+ boolean changed = false;
+ List<Mutable<ILogicalExpression>> argList = expr.getArguments();
+ Iterator<Mutable<ILogicalExpression>> argIter = argList.iterator();
+ Mutable<ILogicalExpression> argFalse = null;
+ while (argIter.hasNext()) {
+ Mutable<ILogicalExpression> argExprRef = argIter.next();
+ ILogicalExpression argExpr = argExprRef.getValue();
+ if (argExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ continue;
+ }
+
+ ConstantExpression cExpr = (ConstantExpression) argExpr;
+ IAlgebricksConstantValue cValue = cExpr.getValue();
+ FunctionIdentifier fid = expr.getFunctionIdentifier();
+
+ if (replaceAndReturn(cValue, fid)) {
+ // or(true,x,y) -> true;
+ // and(false, x, y) -> false
+ argList.clear();
+ argList.add(argExprRef);
+ return true;
+ } else if (removeAndContinue(cValue, fid)) {
+ // or(false, x, y) -> or(x, y)
+ // and(true, x, y) -> and(x, y)
+ // remove 'false' (or 'true') from arg list, but save the
expression.
+ argFalse = argExprRef;
+ argIter.remove();
+ changed = true;
+ }
+ }
+ if (argList.isEmpty() && argFalse != null) {
+ argList.add(argFalse);
+ }
+ return changed;
+ }
+
+ private boolean replaceAndReturn(IAlgebricksConstantValue cValue,
FunctionIdentifier fid) {
+ if (BuiltinFunctions.OR.equals(fid)) {
+ return cValue.isTrue();
+ } else {
+ // BuiltinFunctions.AND
+ return cValue.isFalse();
+ }
+ }
+
+ private boolean removeAndContinue(IAlgebricksConstantValue cValue,
FunctionIdentifier fid) {
+ if (BuiltinFunctions.OR.equals(fid)) {
+ return cValue.isFalse();
+ } else {
+ // BuiltinFunctions.AND
+ return cValue.isTrue();
+ }
+ }
+
+ // IEvaluatorContext
+
+ @Override
+ public IServiceContext getServiceContext() {
+ return serviceContext;
+ }
+
+ @Override
+ public IHyracksTaskContext getTaskContext() {
+ return null;
+ }
+
+ @Override
+ public IWarningCollector getWarningCollector() {
+ return warningCollector;
+ }
+}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index fc63d89680..28bce7ee21 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -22,7 +22,6 @@ import static
org.apache.asterix.common.api.IIdentifierMapper.Modifier.PLURAL;
import static
org.apache.asterix.common.metadata.MetadataConstants.METADATA_OBJECT_NAME_INVALID_CHARS;
import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
-import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -108,6 +107,7 @@ import
org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOper
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Quadruple;
import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -123,6 +123,7 @@ import
org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -134,8 +135,8 @@ import
org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import
org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -740,19 +741,14 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
@Override
- public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>
getWriteFileRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories,
IAWriterFactory writerFactory,
- RecordDescriptor inputDesc) {
- FileSplitDataSink fsds = (FileSplitDataSink) sink;
- FileSplitSinkId fssi = fsds.getId();
- FileSplit fs = fssi.getFileSplit();
- File outFile = new File(fs.getPath());
- String nodeId = fs.getNodeName();
-
- SinkWriterRuntimeFactory runtime =
- new SinkWriterRuntimeFactory(printColumns, printerFactories,
outFile, writerFactory, inputDesc);
- AlgebricksPartitionConstraint apc = new
AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
- return new Pair<>(runtime, apc);
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>
getWriteFileRuntime(int sourceColumn,
+ int[] partitionColumns, IBinaryComparatorFactory[]
partitionComparatorFactories,
+ IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression
staticPathExpr,
+ SourceLocation pathSourceLocation, IWriteDataSink sink,
RecordDescriptor inputDesc, Object sourceType)
+ throws AlgebricksException {
+
+ // TODO implement
+ throw new NotImplementedException();
}
@Override
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
new file mode 100644
index 0000000000..753ac5430b
--- /dev/null
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
+
+public class WriteDataSink implements IWriteDataSink {
+ private final String adapterName;
+ private final Map<String, String> configuration;
+
+ public WriteDataSink(String adapterName, Map<String, String>
configuration) {
+ this.adapterName = adapterName;
+ this.configuration = configuration;
+ }
+
+ private WriteDataSink(WriteDataSink writeDataSink) {
+ this.adapterName = writeDataSink.getAdapterName();
+ this.configuration = new HashMap<>(writeDataSink.configuration);
+ }
+
+ @Override
+ public final String getAdapterName() {
+ return adapterName;
+ }
+
+ @Override
+ public final Map<String, String> getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public IWriteDataSink createCopy() {
+ return new WriteDataSink(this);
+ }
+}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
index cc051832ca..72f659dd0e 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.om.utils;
import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADouble;
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AOrderedList;
@@ -32,6 +33,7 @@ import
org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.api.exceptions.SourceLocation;
public class ConstantExpressionUtil {
@@ -115,4 +117,22 @@ public class ConstantExpressionUtil {
return expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
? getStringArgument((AbstractFunctionCallExpression) expr,
index) : null;
}
+
+ public static ConstantExpression create(String value, SourceLocation
sourceLocation) {
+ return createExpression(new AString(value), sourceLocation);
+ }
+
+ public static ConstantExpression create(long value, SourceLocation
sourceLocation) {
+ return createExpression(new AInt64(value), sourceLocation);
+ }
+
+ public static ConstantExpression create(double value, SourceLocation
sourceLocation) {
+ return createExpression(new ADouble(value), sourceLocation);
+ }
+
+ private static ConstantExpression createExpression(IAObject value,
SourceLocation sourceLocation) {
+ ConstantExpression constExpr = new ConstantExpression(new
AsterixConstantValue(value));
+ constExpr.setSourceLocation(sourceLocation);
+ return constExpr;
+ }
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4f3d8e4532..2072deedd5 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -36,8 +36,11 @@ import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -56,8 +59,10 @@ public interface IMetadataProvider<S, I> {
IVariableTypeEnvironment typeEnv, JobGenContext context,
JobSpecification jobSpec, Object implConfig,
IProjectionFiltrationInfo projectionFiltrationInfo) throws
AlgebricksException;
- Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>
getWriteFileRuntime(IDataSink sink, int[] printColumns,
- IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
RecordDescriptor inputDesc)
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>
getWriteFileRuntime(int sourceColumn,
+ int[] partitionColumns, IBinaryComparatorFactory[]
partitionComparatorFactories,
+ IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression
staticPathExpr,
+ SourceLocation pathSourceLocation, IWriteDataSink sink,
RecordDescriptor inputDesc, Object sourceType)
throws AlgebricksException;
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getResultHandleRuntime(IDataSink sink, int[] printColumns,
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
new file mode 100644
index 0000000000..fa7a55ec41
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.metadata;
+
+import java.util.Map;
+
+public interface IWriteDataSink {
+ String getAdapterName();
+
+ Map<String, String> getConfiguration();
+
+ IWriteDataSink createCopy();
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
index 61b77967db..7eef90e509 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
@@ -23,31 +23,77 @@ import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class WriteOperator extends AbstractLogicalOperator {
- private List<Mutable<ILogicalExpression>> expressions;
- private IDataSink dataSink;
+ private final Mutable<ILogicalExpression> sourceExpression;
+ private final Mutable<ILogicalExpression> pathExpression;
+ private final List<Mutable<ILogicalExpression>> partitionExpressions;
+ private final List<Pair<OrderOperator.IOrder,
Mutable<ILogicalExpression>>> orderExpressions;
+ private final IWriteDataSink writeDataSink;
- public WriteOperator(List<Mutable<ILogicalExpression>> expressions,
IDataSink dataSink) {
- this.expressions = expressions;
- this.dataSink = dataSink;
+ public WriteOperator(Mutable<ILogicalExpression> sourceExpression,
Mutable<ILogicalExpression> pathExpression,
+ List<Mutable<ILogicalExpression>> partitionExpressions,
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
orderExpressions,
+ IWriteDataSink writeDataSink) {
+ this.sourceExpression = sourceExpression;
+ this.pathExpression = pathExpression;
+ this.partitionExpressions = partitionExpressions;
+ this.orderExpressions = orderExpressions;
+ this.writeDataSink = writeDataSink;
}
- public List<Mutable<ILogicalExpression>> getExpressions() {
- return expressions;
+ public Mutable<ILogicalExpression> getSourceExpression() {
+ return sourceExpression;
}
- public IDataSink getDataSink() {
- return dataSink;
+ public LogicalVariable getSourceVariable() {
+ return VariableUtilities.getVariable(sourceExpression.getValue());
+ }
+
+ public Mutable<ILogicalExpression> getPathExpression() {
+ return pathExpression;
+ }
+
+ public List<Mutable<ILogicalExpression>> getPartitionExpressions() {
+ return partitionExpressions;
+ }
+
+ public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
getOrderExpressions() {
+ return orderExpressions;
+ }
+
+ public List<LogicalVariable> getPartitionVariables() {
+ List<LogicalVariable> partitionVariables = new ArrayList<>();
+ for (Mutable<ILogicalExpression> partitionExpression :
partitionExpressions) {
+
partitionVariables.add(VariableUtilities.getVariable(partitionExpression.getValue()));
+ }
+ return partitionVariables;
+ }
+
+ public List<OrderColumn> getOrderColumns() {
+ List<OrderColumn> orderColumns = new ArrayList<>();
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>
orderExpressionPair : orderExpressions) {
+ LogicalVariable variable =
VariableUtilities.getVariable(orderExpressionPair.getSecond().getValue());
+ OrderOperator.IOrder.OrderKind kind =
orderExpressionPair.first.getKind();
+ orderColumns.add(new OrderColumn(variable, kind));
+ }
+ return orderColumns;
+ }
+
+ public IWriteDataSink getWriteDataSink() {
+ return writeDataSink;
}
@Override
@@ -62,35 +108,37 @@ public class WriteOperator extends AbstractLogicalOperator
{
@Override
public boolean
acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws
AlgebricksException {
- boolean modif = false;
- for (int i = 0; i < expressions.size(); i++) {
- boolean b = visitor.transform(expressions.get(i));
- if (b) {
- modif = true;
- }
+ boolean changed = visitor.transform(sourceExpression);
+ changed |= visitor.transform(pathExpression);
+
+ for (Mutable<ILogicalExpression> expression : partitionExpressions) {
+ changed |= visitor.transform(expression);
}
- return modif;
+
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>
orderExpressionPair : orderExpressions) {
+ changed |= visitor.transform(orderExpressionPair.second);
+ }
+
+ return changed;
}
@Override
public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
+ return VariablePropagationPolicy.NONE;
}
@Override
public boolean isMap() {
- return false; // actually depends on the physical op.
+ return true;
}
@Override
public void recomputeSchema() {
- schema = new ArrayList<LogicalVariable>();
- schema.addAll(inputs.get(0).getValue().getSchema());
+ schema = new ArrayList<>(inputs.get(0).getValue().getSchema());
}
@Override
public IVariableTypeEnvironment
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
return createPropagatingAllInputsTypeEnvironment(ctx);
}
-
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 960e399520..b3828df20e 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -35,6 +35,7 @@ import
org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
import
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -289,9 +290,15 @@ public class OperatorDeepCopyVisitor implements
ILogicalOperatorVisitor<ILogical
@Override
public ILogicalOperator visitWriteOperator(WriteOperator op, Void arg)
throws AlgebricksException {
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new
ArrayList<>();
- deepCopyExpressionRefs(newExpressions, op.getExpressions());
- return new WriteOperator(newExpressions, op.getDataSink());
+ Mutable<ILogicalExpression> newSourceExpression =
deepCopyExpressionRef(op.getSourceExpression());
+ Mutable<ILogicalExpression> newPathExpression =
deepCopyExpressionRef(op.getPathExpression());
+ List<Mutable<ILogicalExpression>> newPartitionExpressions =
+ deepCopyExpressionRefs(new ArrayList<>(),
op.getPartitionExpressions());
+ List<Pair<IOrder, Mutable<ILogicalExpression>>>
newOrderPairExpressions =
+ deepCopyOrderAndExpression(op.getOrderExpressions());
+ IWriteDataSink writeDataSink = op.getWriteDataSink().createCopy();
+ return new WriteOperator(newSourceExpression, newPathExpression,
newPartitionExpressions,
+ newOrderPairExpressions, writeDataSink);
}
@Override
@@ -369,11 +376,12 @@ public class OperatorDeepCopyVisitor implements
ILogicalOperatorVisitor<ILogical
return new SinkOperator();
}
- private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>>
newExprs,
+ private List<Mutable<ILogicalExpression>>
deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
List<Mutable<ILogicalExpression>> oldExprs) {
for (Mutable<ILogicalExpression> oldExpr : oldExprs) {
newExprs.add(new
MutableObject<>(oldExpr.getValue().cloneExpression()));
}
+ return newExprs;
}
private Mutable<ILogicalExpression>
deepCopyExpressionRef(Mutable<ILogicalExpression> oldExprRef) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 74de6f5473..4067b62216 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -254,7 +254,7 @@ public class SchemaVariableVisitor implements
ILogicalOperatorVisitor<Void, Void
@Override
public Void visitWriteOperator(WriteOperator op, Void arg) throws
AlgebricksException {
- standardLayout(op);
+ // Write is akin to project empty
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index faf3c1141e..7ceb812774 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -380,7 +380,12 @@ public class SubstituteVariableVisitor
@Override
public Void visitWriteOperator(WriteOperator op, Pair<LogicalVariable,
LogicalVariable> pair)
throws AlgebricksException {
- substUsedVariablesInExpr(op.getExpressions(), pair.first, pair.second);
+ substUsedVariablesInExpr(op.getSourceExpression(), pair.first,
pair.second);
+ substUsedVariablesInExpr(op.getPathExpression(), pair.first,
pair.second);
+ substUsedVariablesInExpr(op.getPartitionExpressions(), pair.first,
pair.second);
+ for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr :
op.getOrderExpressions()) {
+ substUsedVariablesInExpr(orderExpr.second, pair.first,
pair.second);
+ }
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index d7b6228853..d7b2555a83 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -353,9 +353,15 @@ public class UsedVariableVisitor implements
ILogicalOperatorVisitor<Void, Void>
@Override
public Void visitWriteOperator(WriteOperator op, Void arg) {
- for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+ op.getSourceExpression().getValue().getUsedVariables(usedVariables);
+ op.getPathExpression().getValue().getUsedVariables(usedVariables);
+ for (Mutable<ILogicalExpression> expr : op.getPartitionExpressions()) {
expr.getValue().getUsedVariables(usedVariables);
}
+
+ for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr :
op.getOrderExpressions()) {
+ orderExpr.second.getValue().getUsedVariables(usedVariables);
+ }
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index fcc8c8e9a2..8ff605dd8f 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -92,25 +92,8 @@ public abstract class AbstractWindowPOperator extends
AbstractPhysicalOperator {
throw new IllegalStateException(op.getExecutionMode().name());
}
- // require local order property [pc1, ... pcN, oc1, ... ocN]
- // accounting for cases where there's an overlap between order and
partition columns
- // TODO replace with required local grouping on partition columns +
local order on order columns
- List<OrderColumn> lopColumns = new ArrayList<>();
- ListSet<LogicalVariable> pcVars = new ListSet<>();
- pcVars.addAll(partitionColumns);
- for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
- OrderColumn oc = orderColumns.get(oIdx);
- LogicalVariable ocVar = oc.getColumn();
- if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1,
pcVars)) {
- throw
AlgebricksException.create(ErrorCode.UNSUPPORTED_WINDOW_SPEC,
op.getSourceLocation(),
- String.valueOf(partitionColumns),
String.valueOf(orderColumns));
- }
- lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
- }
- int pIdx = 0;
- for (LogicalVariable pColumn : pcVars) {
- lopColumns.add(pIdx++, new OrderColumn(pColumn,
OrderOperator.IOrder.OrderKind.ASC));
- }
+ List<OrderColumn> lopColumns =
+ getOrderRequirement(op, ErrorCode.UNSUPPORTED_WINDOW_SPEC,
partitionColumns, orderColumns);
List<ILocalStructuralProperty> localProps =
lopColumns.isEmpty() ? null : Collections.singletonList(new
LocalOrderProperty(lopColumns));
@@ -295,4 +278,30 @@ public abstract class AbstractWindowPOperator extends
AbstractPhysicalOperator {
}
return false;
}
+
+ static List<OrderColumn> getOrderRequirement(ILogicalOperator op,
ErrorCode errorCode,
+ List<LogicalVariable> partitionColumns, List<OrderColumn>
orderColumns) throws AlgebricksException {
+ // require local order property [pc1, ... pcN, oc1, ... ocN]
+ // accounting for cases where there's an overlap between order and
partition columns
+ // TODO replace with required local grouping on partition columns +
local order on order columns
+ List<OrderColumn> lopColumns = new ArrayList<>();
+ ListSet<LogicalVariable> pcVars = new ListSet<>();
+ pcVars.addAll(partitionColumns);
+ for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+ OrderColumn oc = orderColumns.get(oIdx);
+ LogicalVariable ocVar = oc.getColumn();
+ if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1,
pcVars)) {
+ throw AlgebricksException.create(errorCode,
op.getSourceLocation(), String.valueOf(partitionColumns),
+ String.valueOf(orderColumns));
+ }
+ lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+ }
+ int pIdx = 0;
+ for (LogicalVariable pColumn : pcVars) {
+ lopColumns.add(pIdx++, new OrderColumn(pColumn,
OrderOperator.IOrder.OrderKind.ASC));
+ }
+
+ return lopColumns;
+ }
+
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 07c798f2dc..d462cd5b8c 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -18,10 +18,14 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-import org.apache.commons.lang3.mutable.Mutable;
+import static
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator.getOrderRequirement;
+
+import java.util.Collections;
+import java.util.List;
+
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -30,23 +34,40 @@ import
org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import
org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
public class SinkWritePOperator extends AbstractPhysicalOperator {
+ private final LogicalVariable sourceVariable;
+ private final List<LogicalVariable> partitionVariables;
+ private final List<OrderColumn> orderColumns;
+
+ public SinkWritePOperator(LogicalVariable sourceVariable,
List<LogicalVariable> partitionVariables,
+ List<OrderColumn> orderColumns) {
+ this.sourceVariable = sourceVariable;
+ this.partitionVariables = partitionVariables;
+ this.orderColumns = orderColumns;
+ }
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -66,12 +87,34 @@ public class SinkWritePOperator extends
AbstractPhysicalOperator {
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
- WriteOperator write = (WriteOperator) op;
- IDataSink sink = write.getDataSink();
- IPartitioningProperty pp = sink.getPartitioningProperty();
- StructuralPropertiesVector[] r = new StructuralPropertiesVector[] {
new StructuralPropertiesVector(pp, null) };
- return new PhysicalRequirements(r,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ IPhysicalPropertiesVector reqByParent, IOptimizationContext
context) throws AlgebricksException {
+ if (partitionVariables.isEmpty()) {
+ return emptyUnaryRequirements();
+ }
+ IPartitioningProperty pp;
+ switch (op.getExecutionMode()) {
+ case PARTITIONED:
+ pp = UnorderedPartitionedProperty.of(new
ListSet<>(partitionVariables),
+ context.getComputationNodeDomain());
+ break;
+ case UNPARTITIONED:
+ pp = IPartitioningProperty.UNPARTITIONED;
+ break;
+ case LOCAL:
+ pp = null;
+ break;
+ default:
+ throw new IllegalStateException(op.getExecutionMode().name());
+ }
+
+ List<OrderColumn> finalOrderColumns =
+ getOrderRequirement(op, ErrorCode.UNSUPPORTED_WRITE_SPEC,
partitionVariables, orderColumns);
+
+ List<ILocalStructuralProperty> localProps =
+ Collections.singletonList(new
LocalOrderProperty(finalOrderColumns));
+ return new PhysicalRequirements(
+ new StructuralPropertiesVector[] { new
StructuralPropertiesVector(pp, localProps) },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
@Override
@@ -79,29 +122,39 @@ public class SinkWritePOperator extends
AbstractPhysicalOperator {
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
throws AlgebricksException {
WriteOperator write = (WriteOperator) op;
- int[] columns = new int[write.getExpressions().size()];
- int i = 0;
- for (Mutable<ILogicalExpression> exprRef : write.getExpressions()) {
- ILogicalExpression expr = exprRef.getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new NotImplementedException("Only writing variable
expressions is supported.");
- }
- VariableReferenceExpression varRef = (VariableReferenceExpression)
expr;
- LogicalVariable v = varRef.getVariableReference();
- columns[i++] = inputSchemas[0].findVariable(v);
+ IExpressionRuntimeProvider runtimeProvider =
context.getExpressionRuntimeProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ IOperatorSchema schema = inputSchemas[0];
+ IWriteDataSink writeDataSink = write.getWriteDataSink();
+
+ // Source evaluator column
+ int sourceColumn = schema.findVariable(sourceVariable);
+
+ // Path expression
+ IScalarEvaluatorFactory dynamicPathEvalFactory = null;
+ ILogicalExpression staticPathExpr = null;
+ ILogicalExpression pathExpr = write.getPathExpression().getValue();
+ if (pathExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ dynamicPathEvalFactory =
runtimeProvider.createEvaluatorFactory(pathExpr, typeEnv, inputSchemas,
context);
+ } else {
+ staticPathExpr = pathExpr;
}
+
+ // Partition columns
+ int[] partitionColumns = JobGenHelper.projectVariables(schema,
partitionVariables);
+ IBinaryComparatorFactory[] partitionComparatorFactories =
+
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionVariables,
typeEnv, context);
+
RecordDescriptor recDesc =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
context.getTypeEnvironment(op.getInputs().get(0).getValue()),
inputSchemas[0], context);
- IPrinterFactory[] pf =
- JobGenHelper.mkPrinterFactories(inputSchemas[0],
context.getTypeEnvironment(op), context, columns);
-
IMetadataProvider<?, ?> mp = context.getMetadataProvider();
- Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>
runtimeAndConstraints =
- mp.getWriteFileRuntime(write.getDataSink(), columns, pf,
context.getWriterFactory(), inputDesc);
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>
runtimeAndConstraints = mp.getWriteFileRuntime(
+ sourceColumn, partitionColumns, partitionComparatorFactories,
dynamicPathEvalFactory, staticPathExpr,
+ pathExpr.getSourceLocation(), writeDataSink, inputDesc,
typeEnv.getVarType(sourceVariable));
IPushRuntimeFactory runtime = runtimeAndConstraints.first;
runtime.setSourceLocation(write.getSourceLocation());
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index f49b6d486e..e1e8c50405 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -266,8 +266,22 @@ public class LogicalOperatorPrettyPrintVisitor extends
AbstractLogicalOperatorPr
@Override
public Void visitWriteOperator(WriteOperator op, Integer indent) throws
AlgebricksException {
- addIndent(indent).append("write ");
- pprintExprList(op.getExpressions(), indent);
+ AlgebricksStringBuilderWriter writer = addIndent(indent);
+ writer.append("write (");
+ writer.append(op.getSourceExpression().getValue().accept(exprVisitor,
indent));
+ writer.append(") to path [");
+ writer.append(op.getPathExpression().getValue().accept(exprVisitor,
indent));
+ writer.append("] ");
+ List<Mutable<ILogicalExpression>> partitionExpressions =
op.getPartitionExpressions();
+ if (!partitionExpressions.isEmpty()) {
+ writer.append(" partition ");
+ pprintExprList(op.getPartitionExpressions(), indent);
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
orderExpressions = op.getOrderExpressions();
+ if (!orderExpressions.isEmpty()) {
+ writer.append(" order ");
+ pprintOrderList(orderExpressions, indent);
+ }
+ }
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 6f65a9d1d9..a60308adb1 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -482,9 +482,20 @@ public class LogicalOperatorPrettyPrintVisitorJson extends
AbstractLogicalOperat
public Void visitWriteOperator(WriteOperator op, Void indent) throws
AlgebricksException {
try {
jsonGenerator.writeStringField(OPERATOR_FIELD, "write");
- List<Mutable<ILogicalExpression>> expressions =
op.getExpressions();
- if (!expressions.isEmpty()) {
- writeArrayFieldOfExpressions(EXPRESSIONS_FIELD, expressions,
indent);
+
+ writeStringFieldExpression("value", op.getSourceExpression(),
indent);
+ writeStringFieldExpression("path", op.getPathExpression(), indent);
+
+ List<Mutable<ILogicalExpression>> partitionExpressions =
op.getPartitionExpressions();
+ if (!partitionExpressions.isEmpty()) {
+ writeObjectFieldWithExpressions("partition-by",
partitionExpressions, indent);
+
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
orderExpressions =
+ op.getOrderExpressions();
+ if (!orderExpressions.isEmpty()) {
+ writeArrayFieldOfOrderExprList("order-by",
orderExpressions, indent);
+ }
+
}
return null;
} catch (IOException e) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 1cea0a9518..1f36aa5cb9 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -230,8 +230,23 @@ public class LogicalOperatorDotVisitor implements
ILogicalOperatorVisitor<String
@Override
public String visitWriteOperator(WriteOperator op, Boolean showDetails) {
stringBuilder.setLength(0);
- stringBuilder.append("write ");
- printExprList(op.getExpressions());
+ stringBuilder.append("write (");
+ stringBuilder.append(op.getSourceExpression());
+ stringBuilder.append(") to [");
+ stringBuilder.append(op.getPathExpression());
+ stringBuilder.append(']');
+ List<Mutable<ILogicalExpression>> partitionExpressions =
op.getPartitionExpressions();
+ if (!partitionExpressions.isEmpty()) {
+ stringBuilder.append(" partition by ");
+ printExprList(partitionExpressions);
+
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
orderExpressions = op.getOrderExpressions();
+ if (!orderExpressions.isEmpty()) {
+ stringBuilder.append(" order ");
+ printOrderExprList(orderExpressions);
+ }
+ }
+
appendSchema(op, showDetails);
appendAnnotations(op, showDetails);
appendPhysicalOperatorInfo(op, showDetails);
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 2784a6aa97..cc8c007ea0 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -369,8 +369,14 @@ public class SetAlgebricksPhysicalOperatorsRule implements
IAlgebraicRewriteRule
}
@Override
- public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean
topLevelOp) {
- return new SinkWritePOperator();
+ public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean
topLevelOp) throws AlgebricksException {
+ ILogicalExpression sourceExpr =
op.getSourceExpression().getValue();
+ if (sourceExpr.getExpressionTag() !=
LogicalExpressionTag.VARIABLE) {
+ throw
AlgebricksException.create(ErrorCode.EXPR_NOT_NORMALIZED,
sourceExpr.getSourceLocation());
+ }
+ ensureAllVariables(op.getPartitionExpressions(), v -> v);
+ ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
+ return new SinkWritePOperator(op.getSourceVariable(),
op.getPartitionVariables(), op.getOrderColumns());
}
@Override