This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b3b5fadf0 [ASTERIXDB-3287][COMP] Introduce write operator
9b3b5fadf0 is described below

commit 9b3b5fadf0a6c646631a3e0757520af149c9bcf5
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Tue Oct 31 07:03:26 2023 -0700

    [ASTERIXDB-3287][COMP] Introduce write operator
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Add the logical and physical write operators. We utilize
    the old (and deprecated) operators.
    
    Change-Id: Ib4fca256c6bdfa4b83890c285f509d476f130a54
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17891
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../asterix/optimizer/base/RuleCollections.java    |   2 +
 .../optimizer/rules/CleanupWriteOperatorRule.java  |  59 +++
 .../optimizer/rules/ConstantFoldingRule.java       | 459 +-------------------
 .../rules/visitor/ConstantFoldingVisitor.java      | 479 +++++++++++++++++++++
 .../metadata/declared/MetadataProvider.java        |  26 +-
 .../asterix/metadata/declared/WriteDataSink.java   |  54 +++
 .../asterix/om/utils/ConstantExpressionUtil.java   |  20 +
 .../core/algebra/metadata/IMetadataProvider.java   |   9 +-
 .../core/algebra/metadata/IWriteDataSink.java      |  29 ++
 .../algebra/operators/logical/WriteOperator.java   |  92 +++-
 .../logical/visitors/OperatorDeepCopyVisitor.java  |  16 +-
 .../logical/visitors/SchemaVariableVisitor.java    |   2 +-
 .../visitors/SubstituteVariableVisitor.java        |   7 +-
 .../logical/visitors/UsedVariableVisitor.java      |   8 +-
 .../physical/AbstractWindowPOperator.java          |  47 +-
 .../operators/physical/SinkWritePOperator.java     | 105 +++--
 .../LogicalOperatorPrettyPrintVisitor.java         |  18 +-
 .../LogicalOperatorPrettyPrintVisitorJson.java     |  17 +-
 .../core/utils/LogicalOperatorDotVisitor.java      |  19 +-
 .../rules/SetAlgebricksPhysicalOperatorsRule.java  |  10 +-
 20 files changed, 922 insertions(+), 556 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index bf86cc5e80..29984e6481 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -38,6 +38,7 @@ import 
org.apache.asterix.optimizer.rules.CancelUnnestWithNestedListifyRule;
 import org.apache.asterix.optimizer.rules.CheckFilterExpressionTypeRule;
 import org.apache.asterix.optimizer.rules.CheckFullParallelSortRule;
 import org.apache.asterix.optimizer.rules.CheckInsertUpsertReturningRule;
+import org.apache.asterix.optimizer.rules.CleanupWriteOperatorRule;
 import org.apache.asterix.optimizer.rules.ConstantFoldingRule;
 import org.apache.asterix.optimizer.rules.CountVarToCountOneRule;
 import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
@@ -357,6 +358,7 @@ public final class RuleCollections {
         // RemoveRedundantBooleanExpressionsInJoinRule has to run first to 
probably eliminate the need for
         // introducing an assign operator in ExtractSimilarVariablesInJoinRule
         planCleanupRules.add(new ExtractRedundantVariablesInJoinRule());
+        planCleanupRules.add(new CleanupWriteOperatorRule());
 
         // Needs to invoke ByNameToByIndexFieldAccessRule as the last logical 
optimization rule because
         // some rules can push a FieldAccessByName to a place where the name 
it tries to access is in the closed part.
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
new file mode 100644
index 0000000000..459a29b6e7
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class CleanupWriteOperatorRule implements IAlgebraicRewriteRule {
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.WRITE) {
+            return false;
+        }
+
+        WriteOperator writeOp = (WriteOperator) op;
+        ILogicalExpression pathExpr = writeOp.getPathExpression().getValue();
+        if (pathExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+            return false;
+        }
+
+        boolean changed = false;
+        List<Mutable<ILogicalExpression>> partitionExprs = 
writeOp.getPartitionExpressions();
+        if (!partitionExprs.isEmpty()) {
+            // Useless partition expressions due to having a constant path 
expression
+            partitionExprs.clear();
+            writeOp.getOrderExpressions().clear();
+            changed = true;
+        }
+
+        return changed;
+    }
+}
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 256d4818ff..343ff5dc72 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -19,145 +19,20 @@
 
 package org.apache.asterix.optimizer.rules;
 
-import java.io.DataInputStream;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.WarningCollector;
-import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
-import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
-import org.apache.asterix.dataflow.data.nontagged.NullWriterFactory;
-import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFamilyProvider;
-import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.om.base.ADouble;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.constants.AsterixConstantValue;
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AbstractCollectionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
-import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.optimizer.rules.visitor.ConstantFoldingVisitor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
-import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
-import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.util.LogRedactionUtil;
-
-import com.google.common.collect.ImmutableMap;
 
 public class ConstantFoldingRule implements IAlgebraicRewriteRule {
 
-    private final ConstantFoldingVisitor cfv = new ConstantFoldingVisitor();
-    private final JobGenContext jobGenCtx;
-
-    private static final Map<FunctionIdentifier, IAObject> FUNC_ID_TO_CONSTANT 
= ImmutableMap
-            .of(BuiltinFunctions.NUMERIC_E, new ADouble(Math.E), 
BuiltinFunctions.NUMERIC_PI, new ADouble(Math.PI));
-
-    /**
-     * Throws exceptions in substituteProducedVariable, setVarType, and one 
getVarType method.
-     */
-    private static final IVariableTypeEnvironment _emptyTypeEnv = new 
IVariableTypeEnvironment() {
-
-        @Override
-        public boolean substituteProducedVariable(LogicalVariable v1, 
LogicalVariable v2) {
-            throw new IllegalStateException();
-        }
-
-        @Override
-        public void setVarType(LogicalVariable var, Object type) {
-            throw new IllegalStateException();
-        }
-
-        @Override
-        public Object getVarType(LogicalVariable var, List<LogicalVariable> 
nonMissableVariables,
-                List<List<LogicalVariable>> correlatedMissableVariableLists, 
List<LogicalVariable> nonNullableVariables,
-                List<List<LogicalVariable>> correlatedNullableVariableLists) {
-            throw new IllegalStateException();
-        }
-
-        @Override
-        public Object getVarType(LogicalVariable var) {
-            throw new IllegalStateException();
-        }
-
-        @Override
-        public Object getType(ILogicalExpression expr) throws 
AlgebricksException {
-            return ExpressionTypeComputer.INSTANCE.getType(expr, null, this);
-        }
-    };
-
-    private static final IOperatorSchema[] _emptySchemas = new 
IOperatorSchema[] {};
+    private final ConstantFoldingVisitor cfv;
 
     public ConstantFoldingRule(ICcApplicationContext appCtx) {
-        MetadataProvider metadataProvider = 
MetadataProvider.createWithDefaultNamespace(appCtx);
-        jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, 
SerializerDeserializerProvider.INSTANCE,
-                BinaryHashFunctionFactoryProvider.INSTANCE, 
BinaryHashFunctionFamilyProvider.INSTANCE,
-                BinaryComparatorFactoryProvider.INSTANCE, 
TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
-                BinaryIntegerInspector.FACTORY, 
ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
-                ResultSerializerFactoryProvider.INSTANCE, 
MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
-                UnnestingPositionWriterFactory.INSTANCE, null,
-                new ExpressionRuntimeProvider(new 
QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
-                ExpressionTypeComputer.INSTANCE, null, null, null, null, 
GlobalConfig.DEFAULT_FRAME_SIZE, null,
-                NoOpWarningCollector.INSTANCE, 0, new 
PhysicalOptimizationConfig());
+        cfv = new ConstantFoldingVisitor(appCtx);
     }
 
     @Override
@@ -176,332 +51,4 @@ public class ConstantFoldingRule implements 
IAlgebraicRewriteRule {
         cfv.reset(context);
         return op.acceptExpressionTransform(cfv);
     }
-
-    private class ConstantFoldingVisitor implements 
ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
-            ILogicalExpressionReferenceTransform, IEvaluatorContext {
-
-        private final IPointable p = VoidPointable.FACTORY.createPointable();
-        private final ByteBufferInputStream bbis = new ByteBufferInputStream();
-        private final DataInputStream dis = new DataInputStream(bbis);
-        private final WarningCollector warningCollector = new 
WarningCollector();
-        private IOptimizationContext optContext;
-        private IServiceContext serviceContext;
-
-        private void reset(IOptimizationContext context) {
-            optContext = context;
-            serviceContext =
-                    ((MetadataProvider) 
context.getMetadataProvider()).getApplicationContext().getServiceContext();
-        }
-
-        @Override
-        public boolean transform(Mutable<ILogicalExpression> exprRef) throws 
AlgebricksException {
-            AbstractLogicalExpression expr = (AbstractLogicalExpression) 
exprRef.getValue();
-            Pair<Boolean, ILogicalExpression> newExpression = 
expr.accept(this, null);
-            if (newExpression.first) {
-                exprRef.setValue(newExpression.second);
-            }
-            return newExpression.first;
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> 
visitConstantExpression(ConstantExpression expr, Void arg) {
-            return new Pair<>(false, expr);
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> 
visitVariableReferenceExpression(VariableReferenceExpression expr,
-                Void arg) {
-            return new Pair<>(false, expr);
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> 
visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
-                Void arg) throws AlgebricksException {
-            boolean changed = constantFoldArgs(expr, arg);
-            List<Mutable<ILogicalExpression>> argList = expr.getArguments();
-            int argConstantCount = countConstantArgs(argList);
-            FunctionIdentifier fid = expr.getFunctionIdentifier();
-            if (argConstantCount != argList.size()) {
-                if (argConstantCount > 0 && (BuiltinFunctions.OR.equals(fid) 
|| BuiltinFunctions.AND.equals(fid))) {
-                    if (foldOrAndArgs(expr)) {
-                        ILogicalExpression changedExpr =
-                                expr.getArguments().size() == 1 ? 
expr.getArguments().get(0).getValue() : expr;
-                        return new Pair<>(true, changedExpr);
-                    }
-                }
-                return new Pair<>(changed, expr);
-            }
-
-            if (!expr.isFunctional() || !canConstantFold(expr)) {
-                return new Pair<>(changed, expr);
-            }
-
-            try {
-                if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
-                    IAType argType = (IAType) 
_emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
-                    if (argType.getTypeTag() == ATypeTag.OBJECT) {
-                        ARecordType rt = (ARecordType) argType;
-                        String str = 
ConstantExpressionUtil.getStringConstant(expr.getArguments().get(1).getValue());
-                        int k = rt.getFieldIndex(str);
-                        if (k >= 0) {
-                            // wait for the ByNameToByIndex rule to apply
-                            return new Pair<>(changed, expr);
-                        }
-                    }
-                }
-                IAObject c = FUNC_ID_TO_CONSTANT.get(fid);
-                if (c != null) {
-                    ConstantExpression constantExpression = new 
ConstantExpression(new AsterixConstantValue(c));
-                    
constantExpression.setSourceLocation(expr.getSourceLocation());
-                    return new Pair<>(true, constantExpression);
-                }
-
-                IScalarEvaluatorFactory fact = 
jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
-                        _emptyTypeEnv, _emptySchemas, jobGenCtx);
-
-                warningCollector.clear();
-                IScalarEvaluator eval = fact.createScalarEvaluator(this);
-                eval.evaluate(null, p);
-                IAType returnType = (IAType) _emptyTypeEnv.getType(expr);
-                ATypeTag runtimeType = PointableHelper.getTypeTag(p);
-                if (runtimeType.isDerivedType()) {
-                    returnType = TypeComputeUtils.getActualType(returnType);
-                } else {
-                    returnType = TypeTagUtil.getBuiltinTypeByTag(runtimeType);
-                }
-                @SuppressWarnings("rawtypes")
-                ISerializerDeserializer serde =
-                        
jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(returnType);
-                bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(), 
p.getStartOffset(), p.getLength()), 0);
-                IAObject o = (IAObject) serde.deserialize(dis);
-                warningCollector.getWarnings(optContext.getWarningCollector());
-                ConstantExpression constantExpression = new 
ConstantExpression(new AsterixConstantValue(o));
-                constantExpression.setSourceLocation(expr.getSourceLocation());
-                return new Pair<>(true, constantExpression);
-            } catch (HyracksDataException | AlgebricksException e) {
-                if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
-                    AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Exception caught 
at constant folding: " + e, e);
-                }
-                return new Pair<>(false, null);
-            }
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> 
visitAggregateFunctionCallExpression(
-                AggregateFunctionCallExpression expr, Void arg) throws 
AlgebricksException {
-            boolean changed = constantFoldArgs(expr, arg);
-            return new Pair<>(changed, expr);
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> 
visitStatefulFunctionCallExpression(
-                StatefulFunctionCallExpression expr, Void arg) throws 
AlgebricksException {
-            boolean changed = constantFoldArgs(expr, arg);
-            return new Pair<>(changed, expr);
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> 
visitUnnestingFunctionCallExpression(
-                UnnestingFunctionCallExpression expr, Void arg) throws 
AlgebricksException {
-            boolean changed = constantFoldArgs(expr, arg);
-            return new Pair<>(changed, expr);
-        }
-
-        private boolean constantFoldArgs(AbstractFunctionCallExpression expr, 
Void arg) throws AlgebricksException {
-            return 
expr.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)
-                    ? foldRecordArgs(expr, arg) : foldFunctionArgs(expr, arg);
-        }
-
-        private boolean foldFunctionArgs(AbstractFunctionCallExpression expr, 
Void arg) throws AlgebricksException {
-            boolean changed = false;
-            for (Mutable<ILogicalExpression> exprArgRef : expr.getArguments()) 
{
-                changed |= foldArg(exprArgRef, arg);
-            }
-            return changed;
-        }
-
-        private boolean foldRecordArgs(AbstractFunctionCallExpression expr, 
Void arg) throws AlgebricksException {
-            if (expr.getArguments().size() % 2 != 0) {
-                String functionName = expr.getFunctionIdentifier().getName();
-                throw 
CompilationException.create(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS, 
expr.getSourceLocation(),
-                        functionName);
-            }
-            boolean changed = false;
-            Iterator<Mutable<ILogicalExpression>> iterator = 
expr.getArguments().iterator();
-            int fieldNameIdx = 0;
-            while (iterator.hasNext()) {
-                Mutable<ILogicalExpression> fieldNameExprRef = iterator.next();
-                Pair<Boolean, ILogicalExpression> fieldNameExpr = 
fieldNameExprRef.getValue().accept(this, arg);
-                boolean isDuplicate = false;
-                if (fieldNameExpr.first) {
-                    String fieldName = 
ConstantExpressionUtil.getStringConstant(fieldNameExpr.second);
-                    if (fieldName != null) {
-                        isDuplicate = isDuplicateField(fieldName, 
fieldNameIdx, expr.getArguments());
-                    }
-                    if (isDuplicate) {
-                        IWarningCollector warningCollector = 
optContext.getWarningCollector();
-                        if (warningCollector.shouldWarn()) {
-                            
warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
-                                    
ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, 
LogRedactionUtil.userData(fieldName)));
-                        }
-                        iterator.remove();
-                        iterator.next();
-                        iterator.remove();
-                    } else {
-                        fieldNameExprRef.setValue(fieldNameExpr.second);
-                    }
-                    changed = true;
-                }
-                if (!isDuplicate) {
-                    Mutable<ILogicalExpression> fieldValue = iterator.next();
-                    changed |= foldArg(fieldValue, arg);
-                    fieldNameIdx += 2;
-                }
-            }
-            return changed;
-        }
-
-        private boolean isDuplicateField(String fName, int fIdx, 
List<Mutable<ILogicalExpression>> args) {
-            for (int i = 0, size = args.size(); i < size; i += 2) {
-                if (i != fIdx && 
fName.equals(ConstantExpressionUtil.getStringConstant(args.get(i).getValue()))) 
{
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        private boolean foldArg(Mutable<ILogicalExpression> exprArgRef, Void 
arg) throws AlgebricksException {
-            Pair<Boolean, ILogicalExpression> newExpr = 
exprArgRef.getValue().accept(this, arg);
-            if (newExpr.first) {
-                exprArgRef.setValue(newExpr.second);
-                return true;
-            }
-            return false;
-        }
-
-        private int countConstantArgs(List<Mutable<ILogicalExpression>> 
argList) {
-            int n = 0;
-            for (Mutable<ILogicalExpression> r : argList) {
-                if (r.getValue().getExpressionTag() == 
LogicalExpressionTag.CONSTANT) {
-                    n++;
-                }
-            }
-            return n;
-        }
-
-        private boolean canConstantFold(ScalarFunctionCallExpression function) 
throws AlgebricksException {
-            // skip external functions because they're not available at 
compile time (on CC)
-            IFunctionInfo fi = function.getFunctionInfo();
-            if (fi.isExternal()) {
-                return false;
-            }
-            IAType returnType = (IAType) _emptyTypeEnv.getType(function);
-            // skip all functions that would produce records/arrays/multisets 
(derived types) in their open format
-            // this is because constant folding them will make them closed 
(currently)
-            if 
(function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR))
 {
-                if (returnType.getTypeTag() != ATypeTag.OBJECT || 
((ARecordType) returnType).isOpen()) {
-                    return false;
-                }
-            }
-            return canConstantFoldType(returnType);
-        }
-
-        private boolean canConstantFoldType(IAType returnType) {
-            ATypeTag tag = returnType.getTypeTag();
-            if (tag == ATypeTag.ANY) {
-                // if the function is to return a record (or derived data), 
that record would (should) be an open record
-                return false;
-            } else if (tag == ATypeTag.OBJECT) {
-                ARecordType recordType = (ARecordType) returnType;
-                if (recordType.isOpen()) {
-                    return false;
-                }
-                IAType[] fieldTypes = recordType.getFieldTypes();
-                for (int i = 0; i < fieldTypes.length; i++) {
-                    if (!canConstantFoldType(fieldTypes[i])) {
-                        return false;
-                    }
-                }
-            } else if (tag.isListType()) {
-                AbstractCollectionType listType = (AbstractCollectionType) 
returnType;
-                return canConstantFoldType(listType.getItemType());
-            } else if (tag == ATypeTag.UNION) {
-                return canConstantFoldType(((AUnionType) 
returnType).getActualType());
-            }
-            return true;
-        }
-
-        private boolean foldOrAndArgs(ScalarFunctionCallExpression expr) {
-            // or(true,x,y) -> true; or(false,x,y) -> or(x,y)
-            boolean changed = false;
-            List<Mutable<ILogicalExpression>> argList = expr.getArguments();
-            Iterator<Mutable<ILogicalExpression>> argIter = argList.iterator();
-            Mutable<ILogicalExpression> argFalse = null;
-            while (argIter.hasNext()) {
-                Mutable<ILogicalExpression> argExprRef = argIter.next();
-                ILogicalExpression argExpr = argExprRef.getValue();
-                if (argExpr.getExpressionTag() != 
LogicalExpressionTag.CONSTANT) {
-                    continue;
-                }
-
-                ConstantExpression cExpr = (ConstantExpression) argExpr;
-                IAlgebricksConstantValue cValue = cExpr.getValue();
-                FunctionIdentifier fid = expr.getFunctionIdentifier();
-
-                if (replaceAndReturn(cValue, fid)) {
-                    // or(true,x,y) -> true;
-                    // and(false, x, y) -> false
-                    argList.clear();
-                    argList.add(argExprRef);
-                    return true;
-                } else if (removeAndContinue(cValue, fid)) {
-                    // or(false, x, y) -> or(x, y)
-                    // and(true, x, y) -> and(x, y)
-                    // remove 'false' (or 'true') from arg list, but save the 
expression.
-                    argFalse = argExprRef;
-                    argIter.remove();
-                    changed = true;
-                }
-            }
-            if (argList.isEmpty() && argFalse != null) {
-                argList.add(argFalse);
-            }
-            return changed;
-        }
-
-        private boolean replaceAndReturn(IAlgebricksConstantValue cValue, 
FunctionIdentifier fid) {
-            if (BuiltinFunctions.OR.equals(fid)) {
-                return cValue.isTrue();
-            } else {
-                // BuiltinFunctions.AND
-                return cValue.isFalse();
-            }
-        }
-
-        private boolean removeAndContinue(IAlgebricksConstantValue cValue, 
FunctionIdentifier fid) {
-            if (BuiltinFunctions.OR.equals(fid)) {
-                return cValue.isFalse();
-            } else {
-                // BuiltinFunctions.AND
-                return cValue.isTrue();
-            }
-        }
-
-        // IEvaluatorContext
-
-        @Override
-        public IServiceContext getServiceContext() {
-            return serviceContext;
-        }
-
-        @Override
-        public IHyracksTaskContext getTaskContext() {
-            return null;
-        }
-
-        @Override
-        public IWarningCollector getWarningCollector() {
-            return warningCollector;
-        }
-    }
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
new file mode 100644
index 0000000000..0feb6c526c
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.visitor;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningCollector;
+import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
+import org.apache.asterix.dataflow.data.nontagged.NullWriterFactory;
+import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFamilyProvider;
+import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.util.LogRedactionUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+public class ConstantFoldingVisitor implements 
ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
+        ILogicalExpressionReferenceTransform, IEvaluatorContext {
+
+    /**
+     * Throws exceptions in substituteProducedVariable, setVarType, and one 
getVarType method.
+     */
+    private static final IVariableTypeEnvironment _emptyTypeEnv = new 
IVariableTypeEnvironment() {
+
+        @Override
+        public boolean substituteProducedVariable(LogicalVariable v1, 
LogicalVariable v2) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public void setVarType(LogicalVariable var, Object type) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public Object getVarType(LogicalVariable var, List<LogicalVariable> 
nonMissableVariables,
+                List<List<LogicalVariable>> correlatedMissableVariableLists, 
List<LogicalVariable> nonNullableVariables,
+                List<List<LogicalVariable>> correlatedNullableVariableLists) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public Object getVarType(LogicalVariable var) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public Object getType(ILogicalExpression expr) throws 
AlgebricksException {
+            return ExpressionTypeComputer.INSTANCE.getType(expr, null, this);
+        }
+    };
+
+    private static final IOperatorSchema[] _emptySchemas = new 
IOperatorSchema[] {};
+    private static final Map<FunctionIdentifier, IAObject> FUNC_ID_TO_CONSTANT 
= ImmutableMap
+            .of(BuiltinFunctions.NUMERIC_E, new ADouble(Math.E), 
BuiltinFunctions.NUMERIC_PI, new ADouble(Math.PI));
+    private final JobGenContext jobGenCtx;
+    private final IPointable p = VoidPointable.FACTORY.createPointable();
+    private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+    private final DataInputStream dis = new DataInputStream(bbis);
+    private final WarningCollector warningCollector = new WarningCollector();
+    private IOptimizationContext optContext;
+    private IServiceContext serviceContext;
+
+    public ConstantFoldingVisitor(ICcApplicationContext appCtx) {
+        MetadataProvider metadataProvider = 
MetadataProvider.createWithDefaultNamespace(appCtx);
+        jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, 
SerializerDeserializerProvider.INSTANCE,
+                BinaryHashFunctionFactoryProvider.INSTANCE, 
BinaryHashFunctionFamilyProvider.INSTANCE,
+                BinaryComparatorFactoryProvider.INSTANCE, 
TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
+                BinaryIntegerInspector.FACTORY, 
ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
+                ResultSerializerFactoryProvider.INSTANCE, 
MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
+                UnnestingPositionWriterFactory.INSTANCE, null,
+                new ExpressionRuntimeProvider(new 
QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
+                ExpressionTypeComputer.INSTANCE, null, null, null, null, 
GlobalConfig.DEFAULT_FRAME_SIZE, null,
+                NoOpWarningCollector.INSTANCE, 0, new 
PhysicalOptimizationConfig());
+    }
+
+    public void reset(IOptimizationContext context) {
+        optContext = context;
+        serviceContext = ((MetadataProvider) 
context.getMetadataProvider()).getApplicationContext().getServiceContext();
+    }
+
+    @Override
+    public boolean transform(Mutable<ILogicalExpression> exprRef) throws 
AlgebricksException {
+        AbstractLogicalExpression expr = (AbstractLogicalExpression) 
exprRef.getValue();
+        Pair<Boolean, ILogicalExpression> newExpression = expr.accept(this, 
null);
+        if (newExpression.first) {
+            exprRef.setValue(newExpression.second);
+        }
+        return newExpression.first;
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> 
visitConstantExpression(ConstantExpression expr, Void arg) {
+        return new Pair<>(false, expr);
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> 
visitVariableReferenceExpression(VariableReferenceExpression expr,
+            Void arg) {
+        return new Pair<>(false, expr);
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> 
visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
+            Void arg) throws AlgebricksException {
+        boolean changed = constantFoldArgs(expr, arg);
+        List<Mutable<ILogicalExpression>> argList = expr.getArguments();
+        int argConstantCount = countConstantArgs(argList);
+        FunctionIdentifier fid = expr.getFunctionIdentifier();
+        if (argConstantCount != argList.size()) {
+            if (argConstantCount > 0 && (BuiltinFunctions.OR.equals(fid) || 
BuiltinFunctions.AND.equals(fid))) {
+                if (foldOrAndArgs(expr)) {
+                    ILogicalExpression changedExpr =
+                            expr.getArguments().size() == 1 ? 
expr.getArguments().get(0).getValue() : expr;
+                    return new Pair<>(true, changedExpr);
+                }
+            }
+            return new Pair<>(changed, expr);
+        }
+
+        if (!expr.isFunctional() || !canConstantFold(expr)) {
+            return new Pair<>(changed, expr);
+        }
+
+        try {
+            if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
+                IAType argType = (IAType) 
_emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
+                if (argType.getTypeTag() == ATypeTag.OBJECT) {
+                    ARecordType rt = (ARecordType) argType;
+                    String str = 
ConstantExpressionUtil.getStringConstant(expr.getArguments().get(1).getValue());
+                    int k = rt.getFieldIndex(str);
+                    if (k >= 0) {
+                        // wait for the ByNameToByIndex rule to apply
+                        return new Pair<>(changed, expr);
+                    }
+                }
+            }
+            IAObject c = FUNC_ID_TO_CONSTANT.get(fid);
+            if (c != null) {
+                ConstantExpression constantExpression = new 
ConstantExpression(new AsterixConstantValue(c));
+                constantExpression.setSourceLocation(expr.getSourceLocation());
+                return new Pair<>(true, constantExpression);
+            }
+
+            IScalarEvaluatorFactory fact = 
jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
+                    _emptyTypeEnv, _emptySchemas, jobGenCtx);
+
+            warningCollector.clear();
+            IScalarEvaluator eval = fact.createScalarEvaluator(this);
+            eval.evaluate(null, p);
+            IAType returnType = (IAType) _emptyTypeEnv.getType(expr);
+            ATypeTag runtimeType = PointableHelper.getTypeTag(p);
+            if (runtimeType.isDerivedType()) {
+                returnType = TypeComputeUtils.getActualType(returnType);
+            } else {
+                returnType = TypeTagUtil.getBuiltinTypeByTag(runtimeType);
+            }
+            @SuppressWarnings("rawtypes")
+            ISerializerDeserializer serde =
+                    
jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(returnType);
+            bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(), 
p.getStartOffset(), p.getLength()), 0);
+            IAObject o = (IAObject) serde.deserialize(dis);
+            warningCollector.getWarnings(optContext.getWarningCollector());
+            ConstantExpression constantExpression = new ConstantExpression(new 
AsterixConstantValue(o));
+            constantExpression.setSourceLocation(expr.getSourceLocation());
+            return new Pair<>(true, constantExpression);
+        } catch (HyracksDataException | AlgebricksException e) {
+            if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
+                AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Exception caught at 
constant folding: " + e, e);
+            }
+            return new Pair<>(false, null);
+        }
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> 
visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr,
+            Void arg) throws AlgebricksException {
+        boolean changed = constantFoldArgs(expr, arg);
+        return new Pair<>(changed, expr);
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> 
visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr,
+            Void arg) throws AlgebricksException {
+        boolean changed = constantFoldArgs(expr, arg);
+        return new Pair<>(changed, expr);
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> 
visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr,
+            Void arg) throws AlgebricksException {
+        boolean changed = constantFoldArgs(expr, arg);
+        return new Pair<>(changed, expr);
+    }
+
+    private boolean constantFoldArgs(AbstractFunctionCallExpression expr, Void 
arg) throws AlgebricksException {
+        return 
expr.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) ? 
foldRecordArgs(expr, arg)
+                : foldFunctionArgs(expr, arg);
+    }
+
+    private boolean foldFunctionArgs(AbstractFunctionCallExpression expr, Void 
arg) throws AlgebricksException {
+        boolean changed = false;
+        for (Mutable<ILogicalExpression> exprArgRef : expr.getArguments()) {
+            changed |= foldArg(exprArgRef, arg);
+        }
+        return changed;
+    }
+
+    private boolean foldRecordArgs(AbstractFunctionCallExpression expr, Void 
arg) throws AlgebricksException {
+        if (expr.getArguments().size() % 2 != 0) {
+            String functionName = expr.getFunctionIdentifier().getName();
+            throw 
CompilationException.create(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS, 
expr.getSourceLocation(),
+                    functionName);
+        }
+        boolean changed = false;
+        Iterator<Mutable<ILogicalExpression>> iterator = 
expr.getArguments().iterator();
+        int fieldNameIdx = 0;
+        while (iterator.hasNext()) {
+            Mutable<ILogicalExpression> fieldNameExprRef = iterator.next();
+            Pair<Boolean, ILogicalExpression> fieldNameExpr = 
fieldNameExprRef.getValue().accept(this, arg);
+            boolean isDuplicate = false;
+            if (fieldNameExpr.first) {
+                String fieldName = 
ConstantExpressionUtil.getStringConstant(fieldNameExpr.second);
+                if (fieldName != null) {
+                    isDuplicate = isDuplicateField(fieldName, fieldNameIdx, 
expr.getArguments());
+                }
+                if (isDuplicate) {
+                    IWarningCollector warningCollector = 
optContext.getWarningCollector();
+                    if (warningCollector.shouldWarn()) {
+                        
warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
+                                ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, 
LogRedactionUtil.userData(fieldName)));
+                    }
+                    iterator.remove();
+                    iterator.next();
+                    iterator.remove();
+                } else {
+                    fieldNameExprRef.setValue(fieldNameExpr.second);
+                }
+                changed = true;
+            }
+            if (!isDuplicate) {
+                Mutable<ILogicalExpression> fieldValue = iterator.next();
+                changed |= foldArg(fieldValue, arg);
+                fieldNameIdx += 2;
+            }
+        }
+        return changed;
+    }
+
+    private boolean isDuplicateField(String fName, int fIdx, 
List<Mutable<ILogicalExpression>> args) {
+        for (int i = 0, size = args.size(); i < size; i += 2) {
+            if (i != fIdx && 
fName.equals(ConstantExpressionUtil.getStringConstant(args.get(i).getValue()))) 
{
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean foldArg(Mutable<ILogicalExpression> exprArgRef, Void arg) 
throws AlgebricksException {
+        Pair<Boolean, ILogicalExpression> newExpr = 
exprArgRef.getValue().accept(this, arg);
+        if (newExpr.first) {
+            exprArgRef.setValue(newExpr.second);
+            return true;
+        }
+        return false;
+    }
+
+    private int countConstantArgs(List<Mutable<ILogicalExpression>> argList) {
+        int n = 0;
+        for (Mutable<ILogicalExpression> r : argList) {
+            if (r.getValue().getExpressionTag() == 
LogicalExpressionTag.CONSTANT) {
+                n++;
+            }
+        }
+        return n;
+    }
+
+    private boolean canConstantFold(ScalarFunctionCallExpression function) 
throws AlgebricksException {
+        // skip external functions because they're not available at compile 
time (on CC)
+        IFunctionInfo fi = function.getFunctionInfo();
+        if (fi.isExternal()) {
+            return false;
+        }
+        IAType returnType = (IAType) _emptyTypeEnv.getType(function);
+        // skip all functions that would produce records/arrays/multisets 
(derived types) in their open format
+        // this is because constant folding them will make them closed 
(currently)
+        if 
(function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR))
 {
+            if (returnType.getTypeTag() != ATypeTag.OBJECT || ((ARecordType) 
returnType).isOpen()) {
+                return false;
+            }
+        }
+        return canConstantFoldType(returnType);
+    }
+
+    private boolean canConstantFoldType(IAType returnType) {
+        ATypeTag tag = returnType.getTypeTag();
+        if (tag == ATypeTag.ANY) {
+            // if the function is to return a record (or derived data), that 
record would (should) be an open record
+            return false;
+        } else if (tag == ATypeTag.OBJECT) {
+            ARecordType recordType = (ARecordType) returnType;
+            if (recordType.isOpen()) {
+                return false;
+            }
+            IAType[] fieldTypes = recordType.getFieldTypes();
+            for (int i = 0; i < fieldTypes.length; i++) {
+                if (!canConstantFoldType(fieldTypes[i])) {
+                    return false;
+                }
+            }
+        } else if (tag.isListType()) {
+            AbstractCollectionType listType = (AbstractCollectionType) 
returnType;
+            return canConstantFoldType(listType.getItemType());
+        } else if (tag == ATypeTag.UNION) {
+            return canConstantFoldType(((AUnionType) 
returnType).getActualType());
+        }
+        return true;
+    }
+
+    private boolean foldOrAndArgs(ScalarFunctionCallExpression expr) {
+        // or(true,x,y) -> true; or(false,x,y) -> or(x,y)
+        boolean changed = false;
+        List<Mutable<ILogicalExpression>> argList = expr.getArguments();
+        Iterator<Mutable<ILogicalExpression>> argIter = argList.iterator();
+        Mutable<ILogicalExpression> argFalse = null;
+        while (argIter.hasNext()) {
+            Mutable<ILogicalExpression> argExprRef = argIter.next();
+            ILogicalExpression argExpr = argExprRef.getValue();
+            if (argExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                continue;
+            }
+
+            ConstantExpression cExpr = (ConstantExpression) argExpr;
+            IAlgebricksConstantValue cValue = cExpr.getValue();
+            FunctionIdentifier fid = expr.getFunctionIdentifier();
+
+            if (replaceAndReturn(cValue, fid)) {
+                // or(true,x,y) -> true;
+                // and(false, x, y) -> false
+                argList.clear();
+                argList.add(argExprRef);
+                return true;
+            } else if (removeAndContinue(cValue, fid)) {
+                // or(false, x, y) -> or(x, y)
+                // and(true, x, y) -> and(x, y)
+                // remove 'false' (or 'true') from arg list, but save the 
expression.
+                argFalse = argExprRef;
+                argIter.remove();
+                changed = true;
+            }
+        }
+        if (argList.isEmpty() && argFalse != null) {
+            argList.add(argFalse);
+        }
+        return changed;
+    }
+
+    private boolean replaceAndReturn(IAlgebricksConstantValue cValue, 
FunctionIdentifier fid) {
+        if (BuiltinFunctions.OR.equals(fid)) {
+            return cValue.isTrue();
+        } else {
+            // BuiltinFunctions.AND
+            return cValue.isFalse();
+        }
+    }
+
+    private boolean removeAndContinue(IAlgebricksConstantValue cValue, 
FunctionIdentifier fid) {
+        if (BuiltinFunctions.OR.equals(fid)) {
+            return cValue.isFalse();
+        } else {
+            // BuiltinFunctions.AND
+            return cValue.isTrue();
+        }
+    }
+
+    // IEvaluatorContext
+
+    @Override
+    public IServiceContext getServiceContext() {
+        return serviceContext;
+    }
+
+    @Override
+    public IHyracksTaskContext getTaskContext() {
+        return null;
+    }
+
+    @Override
+    public IWarningCollector getWarningCollector() {
+        return warningCollector;
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index fc63d89680..28bce7ee21 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -22,7 +22,6 @@ import static 
org.apache.asterix.common.api.IIdentifierMapper.Modifier.PLURAL;
 import static 
org.apache.asterix.common.metadata.MetadataConstants.METADATA_OBJECT_NAME_INVALID_CHARS;
 import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -108,6 +107,7 @@ import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOper
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Quadruple;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -123,6 +123,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -134,8 +135,8 @@ import 
org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -740,19 +741,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     @Override
-    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteFileRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, 
IAWriterFactory writerFactory,
-            RecordDescriptor inputDesc) {
-        FileSplitDataSink fsds = (FileSplitDataSink) sink;
-        FileSplitSinkId fssi = fsds.getId();
-        FileSplit fs = fssi.getFileSplit();
-        File outFile = new File(fs.getPath());
-        String nodeId = fs.getNodeName();
-
-        SinkWriterRuntimeFactory runtime =
-                new SinkWriterRuntimeFactory(printColumns, printerFactories, 
outFile, writerFactory, inputDesc);
-        AlgebricksPartitionConstraint apc = new 
AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
-        return new Pair<>(runtime, apc);
+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteFileRuntime(int sourceColumn,
+            int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories,
+            IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression 
staticPathExpr,
+            SourceLocation pathSourceLocation, IWriteDataSink sink, 
RecordDescriptor inputDesc, Object sourceType)
+            throws AlgebricksException {
+
+        // TODO implement
+        throw new NotImplementedException();
     }
 
     @Override
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
new file mode 100644
index 0000000000..753ac5430b
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
+
+public class WriteDataSink implements IWriteDataSink {
+    private final String adapterName;
+    private final Map<String, String> configuration;
+
+    public WriteDataSink(String adapterName, Map<String, String> 
configuration) {
+        this.adapterName = adapterName;
+        this.configuration = configuration;
+    }
+
+    private WriteDataSink(WriteDataSink writeDataSink) {
+        this.adapterName = writeDataSink.getAdapterName();
+        this.configuration = new HashMap<>(writeDataSink.configuration);
+    }
+
+    @Override
+    public final String getAdapterName() {
+        return adapterName;
+    }
+
+    @Override
+    public final Map<String, String> getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public IWriteDataSink createCopy() {
+        return new WriteDataSink(this);
+    }
+}
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
index cc051832ca..72f659dd0e 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.om.utils;
 
 import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADouble;
 import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.AInt64;
 import org.apache.asterix.om.base.AOrderedList;
@@ -32,6 +33,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class ConstantExpressionUtil {
 
@@ -115,4 +117,22 @@ public class ConstantExpressionUtil {
         return expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
                 ? getStringArgument((AbstractFunctionCallExpression) expr, 
index) : null;
     }
+
+    public static ConstantExpression create(String value, SourceLocation 
sourceLocation) {
+        return createExpression(new AString(value), sourceLocation);
+    }
+
+    public static ConstantExpression create(long value, SourceLocation 
sourceLocation) {
+        return createExpression(new AInt64(value), sourceLocation);
+    }
+
+    public static ConstantExpression create(double value, SourceLocation 
sourceLocation) {
+        return createExpression(new ADouble(value), sourceLocation);
+    }
+
+    private static ConstantExpression createExpression(IAObject value, 
SourceLocation sourceLocation) {
+        ConstantExpression constExpr = new ConstantExpression(new 
AsterixConstantValue(value));
+        constExpr.setSourceLocation(sourceLocation);
+        return constExpr;
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4f3d8e4532..2072deedd5 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -36,8 +36,11 @@ import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -56,8 +59,10 @@ public interface IMetadataProvider<S, I> {
             IVariableTypeEnvironment typeEnv, JobGenContext context, 
JobSpecification jobSpec, Object implConfig,
             IProjectionFiltrationInfo projectionFiltrationInfo) throws 
AlgebricksException;
 
-    Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteFileRuntime(IDataSink sink, int[] printColumns,
-            IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, 
RecordDescriptor inputDesc)
+    Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteFileRuntime(int sourceColumn,
+            int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories,
+            IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression 
staticPathExpr,
+            SourceLocation pathSourceLocation, IWriteDataSink sink, 
RecordDescriptor inputDesc, Object sourceType)
             throws AlgebricksException;
 
     Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getResultHandleRuntime(IDataSink sink, int[] printColumns,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
new file mode 100644
index 0000000000..fa7a55ec41
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.metadata;
+
+import java.util.Map;
+
+public interface IWriteDataSink {
+    String getAdapterName();
+
+    Map<String, String> getConfiguration();
+
+    IWriteDataSink createCopy();
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
index 61b77967db..7eef90e509 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
@@ -23,31 +23,77 @@ import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class WriteOperator extends AbstractLogicalOperator {
-    private List<Mutable<ILogicalExpression>> expressions;
-    private IDataSink dataSink;
+    private final Mutable<ILogicalExpression> sourceExpression;
+    private final Mutable<ILogicalExpression> pathExpression;
+    private final List<Mutable<ILogicalExpression>> partitionExpressions;
+    private final List<Pair<OrderOperator.IOrder, 
Mutable<ILogicalExpression>>> orderExpressions;
+    private final IWriteDataSink writeDataSink;
 
-    public WriteOperator(List<Mutable<ILogicalExpression>> expressions, 
IDataSink dataSink) {
-        this.expressions = expressions;
-        this.dataSink = dataSink;
+    public WriteOperator(Mutable<ILogicalExpression> sourceExpression, 
Mutable<ILogicalExpression> pathExpression,
+            List<Mutable<ILogicalExpression>> partitionExpressions,
+            List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> 
orderExpressions,
+            IWriteDataSink writeDataSink) {
+        this.sourceExpression = sourceExpression;
+        this.pathExpression = pathExpression;
+        this.partitionExpressions = partitionExpressions;
+        this.orderExpressions = orderExpressions;
+        this.writeDataSink = writeDataSink;
     }
 
-    public List<Mutable<ILogicalExpression>> getExpressions() {
-        return expressions;
+    public Mutable<ILogicalExpression> getSourceExpression() {
+        return sourceExpression;
     }
 
-    public IDataSink getDataSink() {
-        return dataSink;
+    public LogicalVariable getSourceVariable() {
+        return VariableUtilities.getVariable(sourceExpression.getValue());
+    }
+
+    public Mutable<ILogicalExpression> getPathExpression() {
+        return pathExpression;
+    }
+
+    public List<Mutable<ILogicalExpression>> getPartitionExpressions() {
+        return partitionExpressions;
+    }
+
+    public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> 
getOrderExpressions() {
+        return orderExpressions;
+    }
+
+    public List<LogicalVariable> getPartitionVariables() {
+        List<LogicalVariable> partitionVariables = new ArrayList<>();
+        for (Mutable<ILogicalExpression> partitionExpression : 
partitionExpressions) {
+            
partitionVariables.add(VariableUtilities.getVariable(partitionExpression.getValue()));
+        }
+        return partitionVariables;
+    }
+
+    public List<OrderColumn> getOrderColumns() {
+        List<OrderColumn> orderColumns = new ArrayList<>();
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> 
orderExpressionPair : orderExpressions) {
+            LogicalVariable variable = 
VariableUtilities.getVariable(orderExpressionPair.getSecond().getValue());
+            OrderOperator.IOrder.OrderKind kind = 
orderExpressionPair.first.getKind();
+            orderColumns.add(new OrderColumn(variable, kind));
+        }
+        return orderColumns;
+    }
+
+    public IWriteDataSink getWriteDataSink() {
+        return writeDataSink;
     }
 
     @Override
@@ -62,35 +108,37 @@ public class WriteOperator extends AbstractLogicalOperator 
{
 
     @Override
     public boolean 
acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws 
AlgebricksException {
-        boolean modif = false;
-        for (int i = 0; i < expressions.size(); i++) {
-            boolean b = visitor.transform(expressions.get(i));
-            if (b) {
-                modif = true;
-            }
+        boolean changed = visitor.transform(sourceExpression);
+        changed |= visitor.transform(pathExpression);
+
+        for (Mutable<ILogicalExpression> expression : partitionExpressions) {
+            changed |= visitor.transform(expression);
         }
-        return modif;
+
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> 
orderExpressionPair : orderExpressions) {
+            changed |= visitor.transform(orderExpressionPair.second);
+        }
+
+        return changed;
     }
 
     @Override
     public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
+        return VariablePropagationPolicy.NONE;
     }
 
     @Override
     public boolean isMap() {
-        return false; // actually depends on the physical op.
+        return true;
     }
 
     @Override
     public void recomputeSchema() {
-        schema = new ArrayList<LogicalVariable>();
-        schema.addAll(inputs.get(0).getValue().getSchema());
+        schema = new ArrayList<>(inputs.get(0).getValue().getSchema());
     }
 
     @Override
     public IVariableTypeEnvironment 
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
-
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 960e399520..b3828df20e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -35,6 +35,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -289,9 +290,15 @@ public class OperatorDeepCopyVisitor implements 
ILogicalOperatorVisitor<ILogical
 
     @Override
     public ILogicalOperator visitWriteOperator(WriteOperator op, Void arg) 
throws AlgebricksException {
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new 
ArrayList<>();
-        deepCopyExpressionRefs(newExpressions, op.getExpressions());
-        return new WriteOperator(newExpressions, op.getDataSink());
+        Mutable<ILogicalExpression> newSourceExpression = 
deepCopyExpressionRef(op.getSourceExpression());
+        Mutable<ILogicalExpression> newPathExpression = 
deepCopyExpressionRef(op.getPathExpression());
+        List<Mutable<ILogicalExpression>> newPartitionExpressions =
+                deepCopyExpressionRefs(new ArrayList<>(), 
op.getPartitionExpressions());
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> 
newOrderPairExpressions =
+                deepCopyOrderAndExpression(op.getOrderExpressions());
+        IWriteDataSink writeDataSink = op.getWriteDataSink().createCopy();
+        return new WriteOperator(newSourceExpression, newPathExpression, 
newPartitionExpressions,
+                newOrderPairExpressions, writeDataSink);
     }
 
     @Override
@@ -369,11 +376,12 @@ public class OperatorDeepCopyVisitor implements 
ILogicalOperatorVisitor<ILogical
         return new SinkOperator();
     }
 
-    private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> 
newExprs,
+    private List<Mutable<ILogicalExpression>> 
deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
             List<Mutable<ILogicalExpression>> oldExprs) {
         for (Mutable<ILogicalExpression> oldExpr : oldExprs) {
             newExprs.add(new 
MutableObject<>(oldExpr.getValue().cloneExpression()));
         }
+        return newExprs;
     }
 
     private Mutable<ILogicalExpression> 
deepCopyExpressionRef(Mutable<ILogicalExpression> oldExprRef) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 74de6f5473..4067b62216 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -254,7 +254,7 @@ public class SchemaVariableVisitor implements 
ILogicalOperatorVisitor<Void, Void
 
     @Override
     public Void visitWriteOperator(WriteOperator op, Void arg) throws 
AlgebricksException {
-        standardLayout(op);
+        // Write is akin to project empty
         return null;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index faf3c1141e..7ceb812774 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -380,7 +380,12 @@ public class SubstituteVariableVisitor
     @Override
     public Void visitWriteOperator(WriteOperator op, Pair<LogicalVariable, 
LogicalVariable> pair)
             throws AlgebricksException {
-        substUsedVariablesInExpr(op.getExpressions(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getSourceExpression(), pair.first, 
pair.second);
+        substUsedVariablesInExpr(op.getPathExpression(), pair.first, 
pair.second);
+        substUsedVariablesInExpr(op.getPartitionExpressions(), pair.first, 
pair.second);
+        for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : 
op.getOrderExpressions()) {
+            substUsedVariablesInExpr(orderExpr.second, pair.first, 
pair.second);
+        }
         return null;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index d7b6228853..d7b2555a83 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -353,9 +353,15 @@ public class UsedVariableVisitor implements 
ILogicalOperatorVisitor<Void, Void>
 
     @Override
     public Void visitWriteOperator(WriteOperator op, Void arg) {
-        for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+        op.getSourceExpression().getValue().getUsedVariables(usedVariables);
+        op.getPathExpression().getValue().getUsedVariables(usedVariables);
+        for (Mutable<ILogicalExpression> expr : op.getPartitionExpressions()) {
             expr.getValue().getUsedVariables(usedVariables);
         }
+
+        for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : 
op.getOrderExpressions()) {
+            orderExpr.second.getValue().getUsedVariables(usedVariables);
+        }
         return null;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index fcc8c8e9a2..8ff605dd8f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -92,25 +92,8 @@ public abstract class AbstractWindowPOperator extends 
AbstractPhysicalOperator {
                 throw new IllegalStateException(op.getExecutionMode().name());
         }
 
-        // require local order property [pc1, ... pcN, oc1, ... ocN]
-        // accounting for cases where there's an overlap between order and 
partition columns
-        // TODO replace with required local grouping on partition columns + 
local order on order columns
-        List<OrderColumn> lopColumns = new ArrayList<>();
-        ListSet<LogicalVariable> pcVars = new ListSet<>();
-        pcVars.addAll(partitionColumns);
-        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
-            OrderColumn oc = orderColumns.get(oIdx);
-            LogicalVariable ocVar = oc.getColumn();
-            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, 
pcVars)) {
-                throw 
AlgebricksException.create(ErrorCode.UNSUPPORTED_WINDOW_SPEC, 
op.getSourceLocation(),
-                        String.valueOf(partitionColumns), 
String.valueOf(orderColumns));
-            }
-            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
-        }
-        int pIdx = 0;
-        for (LogicalVariable pColumn : pcVars) {
-            lopColumns.add(pIdx++, new OrderColumn(pColumn, 
OrderOperator.IOrder.OrderKind.ASC));
-        }
+        List<OrderColumn> lopColumns =
+                getOrderRequirement(op, ErrorCode.UNSUPPORTED_WINDOW_SPEC, 
partitionColumns, orderColumns);
         List<ILocalStructuralProperty> localProps =
                 lopColumns.isEmpty() ? null : Collections.singletonList(new 
LocalOrderProperty(lopColumns));
 
@@ -295,4 +278,30 @@ public abstract class AbstractWindowPOperator extends 
AbstractPhysicalOperator {
         }
         return false;
     }
+
+    static List<OrderColumn> getOrderRequirement(ILogicalOperator op, 
ErrorCode errorCode,
+            List<LogicalVariable> partitionColumns, List<OrderColumn> 
orderColumns) throws AlgebricksException {
+        // require local order property [pc1, ... pcN, oc1, ... ocN]
+        // accounting for cases where there's an overlap between order and 
partition columns
+        // TODO replace with required local grouping on partition columns + 
local order on order columns
+        List<OrderColumn> lopColumns = new ArrayList<>();
+        ListSet<LogicalVariable> pcVars = new ListSet<>();
+        pcVars.addAll(partitionColumns);
+        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+            OrderColumn oc = orderColumns.get(oIdx);
+            LogicalVariable ocVar = oc.getColumn();
+            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, 
pcVars)) {
+                throw AlgebricksException.create(errorCode, 
op.getSourceLocation(), String.valueOf(partitionColumns),
+                        String.valueOf(orderColumns));
+            }
+            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+        }
+        int pIdx = 0;
+        for (LogicalVariable pColumn : pcVars) {
+            lopColumns.add(pIdx++, new OrderColumn(pColumn, 
OrderOperator.IOrder.OrderKind.ASC));
+        }
+
+        return lopColumns;
+    }
+
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 07c798f2dc..d462cd5b8c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import org.apache.commons.lang3.mutable.Mutable;
+import static 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator.getOrderRequirement;
+
+import java.util.Collections;
+import java.util.List;
+
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -30,23 +34,40 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 
 public class SinkWritePOperator extends AbstractPhysicalOperator {
+    private final LogicalVariable sourceVariable;
+    private final List<LogicalVariable> partitionVariables;
+    private final List<OrderColumn> orderColumns;
+
+    public SinkWritePOperator(LogicalVariable sourceVariable, 
List<LogicalVariable> partitionVariables,
+            List<OrderColumn> orderColumns) {
+        this.sourceVariable = sourceVariable;
+        this.partitionVariables = partitionVariables;
+        this.orderColumns = orderColumns;
+    }
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -66,12 +87,34 @@ public class SinkWritePOperator extends 
AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
-        WriteOperator write = (WriteOperator) op;
-        IDataSink sink = write.getDataSink();
-        IPartitioningProperty pp = sink.getPartitioningProperty();
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { 
new StructuralPropertiesVector(pp, null) };
-        return new PhysicalRequirements(r, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            IPhysicalPropertiesVector reqByParent, IOptimizationContext 
context) throws AlgebricksException {
+        if (partitionVariables.isEmpty()) {
+            return emptyUnaryRequirements();
+        }
+        IPartitioningProperty pp;
+        switch (op.getExecutionMode()) {
+            case PARTITIONED:
+                pp = UnorderedPartitionedProperty.of(new 
ListSet<>(partitionVariables),
+                        context.getComputationNodeDomain());
+                break;
+            case UNPARTITIONED:
+                pp = IPartitioningProperty.UNPARTITIONED;
+                break;
+            case LOCAL:
+                pp = null;
+                break;
+            default:
+                throw new IllegalStateException(op.getExecutionMode().name());
+        }
+
+        List<OrderColumn> finalOrderColumns =
+                getOrderRequirement(op, ErrorCode.UNSUPPORTED_WRITE_SPEC, 
partitionVariables, orderColumns);
+
+        List<ILocalStructuralProperty> localProps =
+                Collections.singletonList(new 
LocalOrderProperty(finalOrderColumns));
+        return new PhysicalRequirements(
+                new StructuralPropertiesVector[] { new 
StructuralPropertiesVector(pp, localProps) },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
     @Override
@@ -79,29 +122,39 @@ public class SinkWritePOperator extends 
AbstractPhysicalOperator {
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         WriteOperator write = (WriteOperator) op;
-        int[] columns = new int[write.getExpressions().size()];
-        int i = 0;
-        for (Mutable<ILogicalExpression> exprRef : write.getExpressions()) {
-            ILogicalExpression expr = exprRef.getValue();
-            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                throw new NotImplementedException("Only writing variable 
expressions is supported.");
-            }
-            VariableReferenceExpression varRef = (VariableReferenceExpression) 
expr;
-            LogicalVariable v = varRef.getVariableReference();
-            columns[i++] = inputSchemas[0].findVariable(v);
+        IExpressionRuntimeProvider runtimeProvider = 
context.getExpressionRuntimeProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        IOperatorSchema schema = inputSchemas[0];
+        IWriteDataSink writeDataSink = write.getWriteDataSink();
+
+        // Source evaluator column
+        int sourceColumn = schema.findVariable(sourceVariable);
+
+        // Path expression
+        IScalarEvaluatorFactory dynamicPathEvalFactory = null;
+        ILogicalExpression staticPathExpr = null;
+        ILogicalExpression pathExpr = write.getPathExpression().getValue();
+        if (pathExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+            dynamicPathEvalFactory = 
runtimeProvider.createEvaluatorFactory(pathExpr, typeEnv, inputSchemas, 
context);
+        } else {
+            staticPathExpr = pathExpr;
         }
+
+        // Partition columns
+        int[] partitionColumns = JobGenHelper.projectVariables(schema, 
partitionVariables);
+        IBinaryComparatorFactory[] partitionComparatorFactories =
+                
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionVariables, 
typeEnv, context);
+
         RecordDescriptor recDesc =
                 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), 
propagatedSchema, context);
         RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), 
inputSchemas[0], context);
 
-        IPrinterFactory[] pf =
-                JobGenHelper.mkPrinterFactories(inputSchemas[0], 
context.getTypeEnvironment(op), context, columns);
-
         IMetadataProvider<?, ?> mp = context.getMetadataProvider();
 
-        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
runtimeAndConstraints =
-                mp.getWriteFileRuntime(write.getDataSink(), columns, pf, 
context.getWriterFactory(), inputDesc);
+        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
runtimeAndConstraints = mp.getWriteFileRuntime(
+                sourceColumn, partitionColumns, partitionComparatorFactories, 
dynamicPathEvalFactory, staticPathExpr,
+                pathExpr.getSourceLocation(), writeDataSink, inputDesc, 
typeEnv.getVarType(sourceVariable));
         IPushRuntimeFactory runtime = runtimeAndConstraints.first;
         runtime.setSourceLocation(write.getSourceLocation());
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index f49b6d486e..e1e8c50405 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -266,8 +266,22 @@ public class LogicalOperatorPrettyPrintVisitor extends 
AbstractLogicalOperatorPr
 
     @Override
     public Void visitWriteOperator(WriteOperator op, Integer indent) throws 
AlgebricksException {
-        addIndent(indent).append("write ");
-        pprintExprList(op.getExpressions(), indent);
+        AlgebricksStringBuilderWriter writer = addIndent(indent);
+        writer.append("write (");
+        writer.append(op.getSourceExpression().getValue().accept(exprVisitor, 
indent));
+        writer.append(") to path [");
+        writer.append(op.getPathExpression().getValue().accept(exprVisitor, 
indent));
+        writer.append("] ");
+        List<Mutable<ILogicalExpression>> partitionExpressions = 
op.getPartitionExpressions();
+        if (!partitionExpressions.isEmpty()) {
+            writer.append(" partition ");
+            pprintExprList(op.getPartitionExpressions(), indent);
+            List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> 
orderExpressions = op.getOrderExpressions();
+            if (!orderExpressions.isEmpty()) {
+                writer.append(" order ");
+                pprintOrderList(orderExpressions, indent);
+            }
+        }
         return null;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 6f65a9d1d9..a60308adb1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -482,9 +482,20 @@ public class LogicalOperatorPrettyPrintVisitorJson extends 
AbstractLogicalOperat
     public Void visitWriteOperator(WriteOperator op, Void indent) throws 
AlgebricksException {
         try {
             jsonGenerator.writeStringField(OPERATOR_FIELD, "write");
-            List<Mutable<ILogicalExpression>> expressions = 
op.getExpressions();
-            if (!expressions.isEmpty()) {
-                writeArrayFieldOfExpressions(EXPRESSIONS_FIELD, expressions, 
indent);
+
+            writeStringFieldExpression("value", op.getSourceExpression(), 
indent);
+            writeStringFieldExpression("path", op.getPathExpression(), indent);
+
+            List<Mutable<ILogicalExpression>> partitionExpressions = 
op.getPartitionExpressions();
+            if (!partitionExpressions.isEmpty()) {
+                writeObjectFieldWithExpressions("partition-by", 
partitionExpressions, indent);
+
+                List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> 
orderExpressions =
+                        op.getOrderExpressions();
+                if (!orderExpressions.isEmpty()) {
+                    writeArrayFieldOfOrderExprList("order-by", 
orderExpressions, indent);
+                }
+
             }
             return null;
         } catch (IOException e) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 1cea0a9518..1f36aa5cb9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -230,8 +230,23 @@ public class LogicalOperatorDotVisitor implements 
ILogicalOperatorVisitor<String
     @Override
     public String visitWriteOperator(WriteOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
-        stringBuilder.append("write ");
-        printExprList(op.getExpressions());
+        stringBuilder.append("write (");
+        stringBuilder.append(op.getSourceExpression());
+        stringBuilder.append(") to [");
+        stringBuilder.append(op.getPathExpression());
+        stringBuilder.append(']');
+        List<Mutable<ILogicalExpression>> partitionExpressions = 
op.getPartitionExpressions();
+        if (!partitionExpressions.isEmpty()) {
+            stringBuilder.append(" partition by ");
+            printExprList(partitionExpressions);
+
+            List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> 
orderExpressions = op.getOrderExpressions();
+            if (!orderExpressions.isEmpty()) {
+                stringBuilder.append(" order ");
+                printOrderExprList(orderExpressions);
+            }
+        }
+
         appendSchema(op, showDetails);
         appendAnnotations(op, showDetails);
         appendPhysicalOperatorInfo(op, showDetails);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 2784a6aa97..cc8c007ea0 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -369,8 +369,14 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
         }
 
         @Override
-        public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean 
topLevelOp) {
-            return new SinkWritePOperator();
+        public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean 
topLevelOp) throws AlgebricksException {
+            ILogicalExpression sourceExpr = 
op.getSourceExpression().getValue();
+            if (sourceExpr.getExpressionTag() != 
LogicalExpressionTag.VARIABLE) {
+                throw 
AlgebricksException.create(ErrorCode.EXPR_NOT_NORMALIZED, 
sourceExpr.getSourceLocation());
+            }
+            ensureAllVariables(op.getPartitionExpressions(), v -> v);
+            ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
+            return new SinkWritePOperator(op.getSourceVariable(), 
op.getPartitionVariables(), op.getOrderColumns());
         }
 
         @Override

Reply via email to