This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7250ba6730 [ASTERIXDB-3357][COMP][RT] Compiler and runtime support for 
COPY TO statement
7250ba6730 is described below

commit 7250ba6730a6565d86ddf9bb794cb8ac32ef4386
Author: Hussain Towaileb <[email protected]>
AuthorDate: Tue Jan 30 06:21:39 2024 +0300

    [ASTERIXDB-3357][COMP][RT] Compiler and runtime support for COPY TO 
statement
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    This change provides the compiler and runtime support
    for COPY TO statement to write to different destinations.
    
    Change-Id: Icea1ff9f32fe49ee83d0739f5c5a305c3345faa7
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18169
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
---
 .../translator/LangExpressionToPlanTranslator.java | 27 ++++++++--
 .../writer/AbstractCloudExternalFileWriter.java    | 10 ++--
 .../asterix/cloud/writer/S3ExternalFileWriter.java |  4 +-
 .../cloud/writer/S3ExternalFileWriterFactory.java  | 35 ++++++------
 .../external/writer/LocalFSExternalFileWriter.java |  6 +--
 .../writer/LocalFSExternalFileWriterFactory.java   | 31 ++++++-----
 .../writer/printer/TextualExternalFilePrinter.java |  4 +-
 .../printer/TextualExternalFilePrinterFactory.java | 12 ++---
 ...ilePrinter.java => TextualExternalPrinter.java} | 17 ++----
 ...ory.java => TextualExternalPrinterFactory.java} | 18 +++----
 .../lang/common/statement/CopyToStatement.java     | 15 +++---
 .../common/visitor/AbstractInlineUdfsVisitor.java  |  4 ++
 .../lang/common/visitor/FormatPrintVisitor.java    |  6 +--
 .../AbstractSqlppExpressionScopingVisitor.java     |  3 ++
 .../base/AbstractSqlppSimpleExpressionVisitor.java |  1 +
 .../metadata/declared/MetadataProvider.java        | 15 ++++--
 .../metadata/provider/ExternalWriterProvider.java  | 24 ++++-----
 .../runtime/writer/DynamicPathResolver.java        |  2 +-
 ...ExternalWriter.java => ExternalFileWriter.java} |  4 +-
 ...Factory.java => ExternalFileWriterFactory.java} |  8 +--
 ...ation.java => ExternalWriterConfiguration.java} |  4 +-
 .../runtime/writer/IExternalFileWriterFactory.java | 12 ++---
 ...ava => IExternalFileWriterFactoryProvider.java} |  4 +-
 ...ernalFilePrinter.java => IExternalPrinter.java} |  2 +-
 ...erFactory.java => IExternalPrinterFactory.java} |  6 +--
 ...y.java => IExternalWriterFactoryValidator.java} | 11 ++--
 .../core/algebra/metadata/IMetadataProvider.java   |  4 ++
 .../algebra/operators/logical/WriteOperator.java   | 30 ++++++++++-
 .../logical/visitors/OperatorDeepCopyVisitor.java  |  4 +-
 .../visitors/SubstituteVariableVisitor.java        |  1 +
 .../logical/visitors/UsedVariableVisitor.java      |  4 ++
 .../operators/physical/SinkWritePOperator.java     | 31 +++++++++--
 .../rules/SetAlgebricksPhysicalOperatorsRule.java  |  4 +-
 .../operators/writer/IWriterPartitioner.java       | 18 +++----
 .../operators/writer/KeyWriterPartitioner.java     | 18 +++++--
 .../operators/writer/NoOpWriterPartitioner.java    | 23 ++++++--
 .../writer/SinkExternalWriterRuntime.java          | 32 ++---------
 .../writer/SinkExternalWriterRuntimeFactory.java   | 52 ++++++++++++++----
 .../operators/writer/WriterPartitioner.java        | 62 ++++++++++++++++++++++
 39 files changed, 366 insertions(+), 202 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 7dd0217192..0949478da8 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -361,7 +361,7 @@ abstract class LangExpressionToPlanTranslator
         return translate(expr, outputDatasetName, (ICompiledDmlStatement) 
stmt, null, resultMetadata);
     }
 
-    private ILogicalPlan translateCopyTo(Query expr, 
CompiledStatements.ICompiledStatement stmt,
+    public ILogicalPlan translateCopyTo(Query expr, 
CompiledStatements.ICompiledStatement stmt,
             IResultMetadata resultMetadata) throws AlgebricksException {
         CompiledStatements.CompiledCopyToStatement copyTo = 
(CompiledStatements.CompiledCopyToStatement) stmt;
         MutableObject<ILogicalOperator> base = new MutableObject<>(new 
EmptyTupleSourceOperator());
@@ -423,8 +423,7 @@ abstract class LangExpressionToPlanTranslator
         // astPathExpressions has at least one expression see CopyToStatement 
constructor
         List<Expression> astPathExpressions = copyTo.getPathExpressions();
         ILogicalExpression fullPathExpr = null;
-        WriteDataSink writeDataSink;
-        String separator = 
String.valueOf(ExternalWriterProvider.getSeparator(copyTo.getAdapter()));
+        String separator = getExternalWriterSeparator(copyTo.getAdapter());
         List<Mutable<ILogicalExpression>> pathExprs = new 
ArrayList<>(astPathExpressions.size());
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> pathExprPair;
         for (int i = 0; i < astPathExpressions.size(); i++) {
@@ -453,11 +452,25 @@ abstract class LangExpressionToPlanTranslator
             fullPathExpr = concat;
         }
 
+        // Handle key
+        boolean autogenerated = copyTo.isAutogenerated();
+        List<Expression> astKeyExpressions = copyTo.getKeyExpressions();
+        List<Mutable<ILogicalExpression>> keyExpressionRefs = new 
ArrayList<>(astKeyExpressions.size());
+        for (int i = 0; i < copyTo.getKeyExpressions().size(); i++) {
+            Expression expression = astKeyExpressions.get(i);
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> expPair = 
langExprToAlgExpression(expression, topOpRef);
+            keyExpressionRefs.add(new MutableObject<>(expPair.first));
+            Pair<Mutable<ILogicalExpression>, Mutable<ILogicalOperator>> 
wrappedPair =
+                    wrapInAssign(context.newVar(), expPair.first, 
expPair.second);
+            topOpRef = wrappedPair.second;
+        }
+
         // Write adapter configuration
-        writeDataSink = new WriteDataSink(copyTo.getAdapter(), 
copyTo.getProperties());
+        WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(), 
copyTo.getProperties());
+
         // writeOperator
         WriteOperator writeOperator = new WriteOperator(sourceExprRef, new 
MutableObject<>(fullPathExpr),
-                partitionExpressionRefs, orderExprListOut, writeDataSink);
+                partitionExpressionRefs, orderExprListOut, keyExpressionRefs, 
autogenerated, writeDataSink);
         writeOperator.getInputs().add(topOpRef);
 
         // We need DistributeResultOperator to ensure all warnings to be 
delivered to the user
@@ -470,6 +483,10 @@ abstract class LangExpressionToPlanTranslator
         return new ALogicalPlanImpl(globalPlanRoots);
     }
 
+    protected String getExternalWriterSeparator(String adapter) {
+        return String.valueOf(ExternalWriterProvider.getSeparator(adapter));
+    }
+
     public ILogicalPlan translate(Query expr, String outputDatasetName, 
ICompiledDmlStatement stmt,
             ILogicalOperator baseOp, IResultMetadata resultMetadata) throws 
AlgebricksException {
         MutableObject<ILogicalOperator> base = new MutableObject<>(new 
EmptyTupleSourceOperator());
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index 73a0aeb3f2..e2c09eb355 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -28,8 +28,8 @@ import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -39,7 +39,7 @@ import org.apache.hyracks.data.std.api.IValueReference;
 import com.google.common.base.Utf8;
 
 abstract class AbstractCloudExternalFileWriter implements IExternalFileWriter {
-    private final IExternalFilePrinter printer;
+    private final IExternalPrinter printer;
     private final ICloudClient cloudClient;
     private final String bucket;
     private final boolean partitionedPath;
@@ -48,7 +48,7 @@ abstract class AbstractCloudExternalFileWriter implements 
IExternalFileWriter {
     private final IWriteBufferProvider bufferProvider;
     private ICloudBufferedWriter bufferedWriter;
 
-    AbstractCloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient 
cloudClient, String bucket,
+    AbstractCloudExternalFileWriter(IExternalPrinter printer, ICloudClient 
cloudClient, String bucket,
             boolean partitionedPath, IWarningCollector warningCollector, 
SourceLocation pathSourceLocation) {
         this.printer = printer;
         this.cloudClient = cloudClient;
@@ -118,7 +118,7 @@ abstract class AbstractCloudExternalFileWriter implements 
IExternalFileWriter {
             if (isSdkException(e)) {
                 throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
             }
-            throw e;
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -132,7 +132,7 @@ abstract class AbstractCloudExternalFileWriter implements 
IExternalFileWriter {
             if (isSdkException(e)) {
                 throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
             }
-            throw e;
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
index e896c05d41..30ae0fae42 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
@@ -20,7 +20,7 @@ package org.apache.asterix.cloud.writer;
 
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
@@ -29,7 +29,7 @@ import software.amazon.awssdk.core.exception.SdkException;
 final class S3ExternalFileWriter extends AbstractCloudExternalFileWriter {
     static int MAX_LENGTH_IN_BYTES = 1024;
 
-    S3ExternalFileWriter(IExternalFilePrinter printer, ICloudClient 
cloudClient, String bucket, boolean partitionedPath,
+    S3ExternalFileWriter(IExternalPrinter printer, ICloudClient cloudClient, 
String bucket, boolean partitionedPath,
             IWarningCollector warningCollector, SourceLocation 
pathSourceLocation) {
         super(printer, cloudClient, bucket, partitionedPath, warningCollector, 
pathSourceLocation);
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index 5036fc8c07..4477b1f998 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -36,12 +36,12 @@ import 
org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
-import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
-import 
org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -59,24 +59,23 @@ public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFac
     private static final long serialVersionUID = 4551318140901866805L;
     private static final Logger LOGGER = LogManager.getLogger();
     static final char SEPARATOR = '/';
-    public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
-            new IExternalFileFilterWriterFactoryProvider() {
-                @Override
-                public IExternalFileWriterFactory 
create(ExternalFileWriterConfiguration configuration) {
-                    return new S3ExternalFileWriterFactory(configuration);
-                }
+    public static final IExternalFileWriterFactoryProvider PROVIDER = new 
IExternalFileWriterFactoryProvider() {
+        @Override
+        public IExternalFileWriterFactory create(ExternalWriterConfiguration 
configuration) {
+            return new S3ExternalFileWriterFactory(configuration);
+        }
 
-                @Override
-                public char getSeparator() {
-                    return SEPARATOR;
-                }
-            };
+        @Override
+        public char getSeparator() {
+            return SEPARATOR;
+        }
+    };
     private final Map<String, String> configuration;
     private final SourceLocation pathSourceLocation;
     private final String staticPath;
     private transient S3CloudClient cloudClient;
 
-    private S3ExternalFileWriterFactory(ExternalFileWriterConfiguration 
externalConfig) {
+    private S3ExternalFileWriterFactory(ExternalWriterConfiguration 
externalConfig) {
         configuration = externalConfig.getConfiguration();
         pathSourceLocation = externalConfig.getPathSourceLocation();
         staticPath = externalConfig.getStaticPath();
@@ -84,11 +83,11 @@ public final class S3ExternalFileWriterFactory implements 
IExternalFileWriterFac
     }
 
     @Override
-    public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalFilePrinterFactory printerFactory)
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
         buildClient();
         String bucket = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        IExternalFilePrinter printer = printerFactory.createPrinter();
+        IExternalPrinter printer = printerFactory.createPrinter();
         IWarningCollector warningCollector = context.getWarningCollector();
         return new S3ExternalFileWriter(printer, cloudClient, bucket, 
staticPath == null, warningCollector,
                 pathSourceLocation);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
index a3a2f709da..4166ddee69 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -25,19 +25,19 @@ import java.io.IOException;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IValueReference;
 
 final class LocalFSExternalFileWriter implements IExternalFileWriter {
-    private final IExternalFilePrinter printer;
+    private final IExternalPrinter printer;
     private final ILocalFSValidator validator;
     private final SourceLocation pathSourceLocation;
 
-    LocalFSExternalFileWriter(IExternalFilePrinter printer, ILocalFSValidator 
validator,
+    LocalFSExternalFileWriter(IExternalPrinter printer, ILocalFSValidator 
validator,
             SourceLocation pathSourceLocation) {
         this.printer = printer;
         this.validator = validator;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index 313757a8a2..73f34f1e52 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -23,11 +23,11 @@ import java.io.File;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
-import 
org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -36,18 +36,17 @@ import org.apache.hyracks.api.exceptions.SourceLocation;
 public final class LocalFSExternalFileWriterFactory implements 
IExternalFileWriterFactory {
     private static final long serialVersionUID = 871685327574547749L;
     private static final char SEPARATOR = File.separatorChar;
-    public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
-            new IExternalFileFilterWriterFactoryProvider() {
-                @Override
-                public IExternalFileWriterFactory 
create(ExternalFileWriterConfiguration configuration) {
-                    return new LocalFSExternalFileWriterFactory(configuration);
-                }
+    public static final IExternalFileWriterFactoryProvider PROVIDER = new 
IExternalFileWriterFactoryProvider() {
+        @Override
+        public IExternalFileWriterFactory create(ExternalWriterConfiguration 
configuration) {
+            return new LocalFSExternalFileWriterFactory(configuration);
+        }
 
-                @Override
-                public char getSeparator() {
-                    return SEPARATOR;
-                }
-            };
+        @Override
+        public char getSeparator() {
+            return SEPARATOR;
+        }
+    };
     private static final ILocalFSValidator NO_OP_VALIDATOR = 
LocalFSExternalFileWriterFactory::noOpValidation;
     private static final ILocalFSValidator VALIDATOR = 
LocalFSExternalFileWriterFactory::validate;
     private final SourceLocation pathSourceLocation;
@@ -55,7 +54,7 @@ public final class LocalFSExternalFileWriterFactory 
implements IExternalFileWrit
     private final String staticPath;
     private boolean validated;
 
-    private LocalFSExternalFileWriterFactory(ExternalFileWriterConfiguration 
externalConfig) {
+    private LocalFSExternalFileWriterFactory(ExternalWriterConfiguration 
externalConfig) {
         pathSourceLocation = externalConfig.getPathSourceLocation();
         singleNodeCluster = externalConfig.isSingleNodeCluster();
         staticPath = externalConfig.getStaticPath();
@@ -63,7 +62,7 @@ public final class LocalFSExternalFileWriterFactory 
implements IExternalFileWrit
     }
 
     @Override
-    public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalFilePrinterFactory printerFactory)
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
         ILocalFSValidator validator = VALIDATOR;
         if (staticPath != null) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
index 8f8c63a533..57e7b58e7c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -22,12 +22,12 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 
 import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 
-final class TextualExternalFilePrinter implements IExternalFilePrinter {
+final class TextualExternalFilePrinter implements IExternalPrinter {
     private final IPrinter printer;
     private final IExternalFileCompressStreamFactory compressStreamFactory;
     private TextualOutputStreamDelegate delegate;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
index 6778532526..e3d0a66d33 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
@@ -19,23 +19,21 @@
 package org.apache.asterix.external.writer.printer;
 
 import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 
-public class TextualExternalFilePrinterFactory implements 
IExternalFilePrinterFactory {
-    private static final long serialVersionUID = 9155959967258587588L;
-    private final IPrinterFactory printerFactory;
+public class TextualExternalFilePrinterFactory extends 
TextualExternalPrinterFactory {
+    private static final long serialVersionUID = 8971234908711234L;
     private final IExternalFileCompressStreamFactory compressStreamFactory;
 
     public TextualExternalFilePrinterFactory(IPrinterFactory printerFactory,
             IExternalFileCompressStreamFactory compressStreamFactory) {
-        this.printerFactory = printerFactory;
+        super(printerFactory);
         this.compressStreamFactory = compressStreamFactory;
     }
 
     @Override
-    public IExternalFilePrinter createPrinter() {
+    public IExternalPrinter createPrinter() {
         return new TextualExternalFilePrinter(printerFactory.createPrinter(), 
compressStreamFactory);
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
similarity index 71%
copy from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
index 8f8c63a533..537af2e8a8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
@@ -21,21 +21,18 @@ package org.apache.asterix.external.writer.printer;
 import java.io.OutputStream;
 import java.io.PrintStream;
 
-import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 
-final class TextualExternalFilePrinter implements IExternalFilePrinter {
+final class TextualExternalPrinter implements IExternalPrinter {
     private final IPrinter printer;
-    private final IExternalFileCompressStreamFactory compressStreamFactory;
     private TextualOutputStreamDelegate delegate;
     private PrintStream printStream;
 
-    TextualExternalFilePrinter(IPrinter printer, 
IExternalFileCompressStreamFactory compressStreamFactory) {
+    TextualExternalPrinter(IPrinter printer) {
         this.printer = printer;
-        this.compressStreamFactory = compressStreamFactory;
     }
 
     @Override
@@ -44,18 +41,14 @@ final class TextualExternalFilePrinter implements 
IExternalFilePrinter {
     }
 
     @Override
-    public void newStream(OutputStream outputStream) throws 
HyracksDataException {
-        if (printStream != null) {
-            close();
-        }
-        delegate = new 
TextualOutputStreamDelegate(compressStreamFactory.createStream(outputStream));
+    public void newStream(OutputStream outputStream) {
+        delegate = new TextualOutputStreamDelegate(outputStream);
         printStream = new PrintStream(delegate);
     }
 
     @Override
     public void print(IValueReference value) throws HyracksDataException {
         printer.print(value.getByteArray(), value.getStartOffset(), 
value.getLength(), printStream);
-        printStream.println();
         delegate.checkError();
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
similarity index 57%
copy from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
index 6778532526..d779793c20 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
@@ -18,24 +18,20 @@
  */
 package org.apache.asterix.external.writer.printer;
 
-import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 
-public class TextualExternalFilePrinterFactory implements 
IExternalFilePrinterFactory {
+public class TextualExternalPrinterFactory implements IExternalPrinterFactory {
     private static final long serialVersionUID = 9155959967258587588L;
-    private final IPrinterFactory printerFactory;
-    private final IExternalFileCompressStreamFactory compressStreamFactory;
+    protected final IPrinterFactory printerFactory;
 
-    public TextualExternalFilePrinterFactory(IPrinterFactory printerFactory,
-            IExternalFileCompressStreamFactory compressStreamFactory) {
+    public TextualExternalPrinterFactory(IPrinterFactory printerFactory) {
         this.printerFactory = printerFactory;
-        this.compressStreamFactory = compressStreamFactory;
     }
 
     @Override
-    public IExternalFilePrinter createPrinter() {
-        return new TextualExternalFilePrinter(printerFactory.createPrinter(), 
compressStreamFactory);
+    public IExternalPrinter createPrinter() {
+        return new TextualExternalPrinter(printerFactory.createPrinter());
     }
 }
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
index 2520755dbb..599d5287b5 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
@@ -43,7 +43,7 @@ public class CopyToStatement extends AbstractStatement 
implements IReturningStat
     private final Map<Integer, VariableExpr> partitionsVariables;
     private final List<OrderbyClause.OrderModifier> orderByModifiers;
     private final List<OrderbyClause.NullOrderModifier> 
orderByNullModifierList;
-    private final List<Expression> keyExpressions;
+    private List<Expression> keyExpressions;
     private final boolean autogenerated;
     private Namespace namespace;
     private Query query;
@@ -89,7 +89,7 @@ public class CopyToStatement extends AbstractStatement 
implements IReturningStat
         this.orderByModifiers = orderByModifiers;
         this.orderByNullModifierList = orderByNullModifierList;
         this.varCounter = varCounter;
-        this.keyExpressions = keyExpressions;
+        this.keyExpressions = keyExpressions != null ? keyExpressions : new 
ArrayList<>();
         this.autogenerated = autogenerated;
 
         if (pathExpressions.isEmpty()) {
@@ -214,6 +214,7 @@ public class CopyToStatement extends AbstractStatement 
implements IReturningStat
         topLevelExpressions.addAll(pathExpressions);
         topLevelExpressions.addAll(partitionExpressions);
         topLevelExpressions.addAll(orderByList);
+        topLevelExpressions.addAll(keyExpressions);
         return topLevelExpressions;
     }
 
@@ -231,15 +232,15 @@ public class CopyToStatement extends AbstractStatement 
implements IReturningStat
         return keyExpressions;
     }
 
+    public void setKeyExpressions(List<Expression> keyExpressions) {
+        this.keyExpressions = keyExpressions;
+    }
+
     public boolean isAutogenerated() {
         return autogenerated;
     }
 
-    public boolean isSinkFileStore() {
+    public boolean isFileStoreSink() {
         return keyExpressions.isEmpty() && !autogenerated;
     }
-
-    public boolean isSinkDatabaseWithKey() {
-        return !keyExpressions.isEmpty() || autogenerated;
-    }
 }
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
index 8c7b9151f3..39754251de 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
@@ -302,6 +302,10 @@ public abstract class AbstractInlineUdfsVisitor extends 
AbstractQueryExpressionV
         changed |= order.first;
         stmtCopy.setOrderByList(order.second);
 
+        Pair<Boolean, List<Expression>> key = 
inlineUdfsInExprList(stmtCopy.getKeyExpressions());
+        changed |= key.first;
+        stmtCopy.setKeyExpressions(key.second);
+
         return changed;
     }
 
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 52e267851b..6151b024d7 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -573,12 +573,10 @@ public abstract class FormatPrintVisitor implements 
ILangVisitor<Void, Integer>
         cto.getSourceVariable().accept(this, step);
         out.println();
 
-        if (cto.isSinkFileStore()) {
+        if (cto.isFileStoreSink()) {
             formatPrintCopyToFileStore(cto, step);
-        } else if (cto.isSinkDatabaseWithKey()) {
-            formatPrintCopyToDatabaseWithKey(cto, step);
         } else {
-            throw new IllegalStateException("NYI: This should never happen");
+            formatPrintCopyToDatabaseWithKey(cto, step);
         }
 
         out.println("with ");
diff --git 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
index e3e248480d..a7e1f90d58 100644
--- 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
+++ 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
@@ -437,6 +437,9 @@ public class AbstractSqlppExpressionScopingVisitor extends 
AbstractSqlppSimpleEx
         // Visit path exprs
         stmtCopy.setPathExpressions(visit(stmtCopy.getPathExpressions(), 
stmtCopy));
 
+        // Visit key exprs
+        stmtCopy.setKeyExpressions(visit(stmtCopy.getKeyExpressions(), 
stmtCopy));
+
         return null;
     }
 
diff --git 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
index f3d26750da..847393c7bf 100644
--- 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
+++ 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
@@ -364,6 +364,7 @@ public class AbstractSqlppSimpleExpressionVisitor
         stmtCopy.setPathExpressions(visit(stmtCopy.getPathExpressions(), arg));
         
stmtCopy.setPartitionExpressions(visit(stmtCopy.getPartitionExpressions(), 
arg));
         stmtCopy.setOrderByList(visit(stmtCopy.getOrderByList(), arg));
+        stmtCopy.setKeyExpressions(visit(stmtCopy.getKeyExpressions(), arg));
         return null;
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8d8ca80810..340eb0a04e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -106,9 +106,9 @@ import 
org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
-import org.apache.asterix.runtime.writer.ExternalWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -761,14 +761,21 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         fileWriterFactory.validate();
         String fileExtension = ExternalWriterProvider.getFileExtension(sink);
         int maxResult = ExternalWriterProvider.getMaxResult(sink);
-        IExternalFilePrinterFactory printerFactory = 
ExternalWriterProvider.createPrinter(sink, sourceType);
-        ExternalWriterFactory writerFactory = new 
ExternalWriterFactory(fileWriterFactory, printerFactory,
+        IExternalPrinterFactory printerFactory = 
ExternalWriterProvider.createPrinter(sink, sourceType);
+        ExternalFileWriterFactory writerFactory = new 
ExternalFileWriterFactory(fileWriterFactory, printerFactory,
                 fileExtension, maxResult, dynamicPathEvalFactory, staticPath, 
pathSourceLocation);
         SinkExternalWriterRuntimeFactory runtime = new 
SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
                 partitionComparatorFactories, inputDesc, writerFactory);
         return new Pair<>(runtime, null);
     }
 
+    @Override
+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteDatabaseWithKeyRuntime(int sourceColumn,
+            IScalarEvaluatorFactory[] keyEvaluatorFactories, boolean 
autogenerated, IWriteDataSink sink,
+            RecordDescriptor inputDesc, Object sourceType) throws 
AlgebricksException {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getResultHandleRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, 
IAWriterFactory writerFactory,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9142556d5e..2d77a94a31 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -30,17 +30,17 @@ import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStream
 import 
org.apache.asterix.external.writer.compressor.NoOpExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.printer.TextualExternalFilePrinterFactory;
 import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
-import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
-import 
org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 
 public class ExternalWriterProvider {
-    private static final Map<String, IExternalFileFilterWriterFactoryProvider> 
CREATOR_MAP;
+    private static final Map<String, IExternalFileWriterFactoryProvider> 
CREATOR_MAP;
     private static final Map<String, IExternalFileCompressStreamFactory> 
STREAM_COMPRESSORS;
 
     private ExternalWriterProvider() {
@@ -59,7 +59,7 @@ public class ExternalWriterProvider {
     public static IExternalFileWriterFactory 
createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
             String staticPath, SourceLocation pathExpressionLocation) {
         String adapterName = sink.getAdapterName().toLowerCase();
-        IExternalFileFilterWriterFactoryProvider creator = 
CREATOR_MAP.get(adapterName);
+        IExternalFileWriterFactoryProvider creator = 
CREATOR_MAP.get(adapterName);
 
         if (creator == null) {
             throw new UnsupportedOperationException("Unsupported adapter " + 
adapterName);
@@ -83,12 +83,12 @@ public class ExternalWriterProvider {
         return Integer.parseInt(maxResultString);
     }
 
-    private static ExternalFileWriterConfiguration 
createConfiguration(ICcApplicationContext appCtx,
-            IWriteDataSink sink, String staticPath, SourceLocation 
pathExpressionLocation) {
+    private static ExternalWriterConfiguration 
createConfiguration(ICcApplicationContext appCtx, IWriteDataSink sink,
+            String staticPath, SourceLocation pathExpressionLocation) {
         Map<String, String> params = sink.getConfiguration();
         boolean singleNodeCluster = isSingleNodeCluster(appCtx);
 
-        return new ExternalFileWriterConfiguration(params, 
pathExpressionLocation, staticPath, singleNodeCluster);
+        return new ExternalWriterConfiguration(params, pathExpressionLocation, 
staticPath, singleNodeCluster);
     }
 
     private static boolean isSingleNodeCluster(ICcApplicationContext appCtx) {
@@ -96,8 +96,8 @@ public class ExternalWriterProvider {
         return ccs.getNodeManager().getIpAddressNodeNameMap().size() == 1;
     }
 
-    private static void addCreator(String adapterName, 
IExternalFileFilterWriterFactoryProvider creator) {
-        IExternalFileFilterWriterFactoryProvider registeredCreator = 
CREATOR_MAP.get(adapterName.toLowerCase());
+    private static void addCreator(String adapterName, 
IExternalFileWriterFactoryProvider creator) {
+        IExternalFileWriterFactoryProvider registeredCreator = 
CREATOR_MAP.get(adapterName.toLowerCase());
         if (registeredCreator != null) {
             throw new IllegalStateException(
                     "Adapter " + adapterName + " is registered to " + 
registeredCreator.getClass().getName());
@@ -105,7 +105,7 @@ public class ExternalWriterProvider {
         CREATOR_MAP.put(adapterName.toLowerCase(), creator);
     }
 
-    public static IExternalFilePrinterFactory createPrinter(IWriteDataSink 
sink, Object sourceType) {
+    public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, 
Object sourceType) {
         Map<String, String> configuration = sink.getConfiguration();
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
 
@@ -131,7 +131,7 @@ public class ExternalWriterProvider {
     }
 
     public static char getSeparator(String adapterName) {
-        IExternalFileFilterWriterFactoryProvider creator = 
CREATOR_MAP.get(adapterName.toLowerCase());
+        IExternalFileWriterFactoryProvider creator = 
CREATOR_MAP.get(adapterName.toLowerCase());
 
         if (creator == null) {
             throw new UnsupportedOperationException("Unsupported adapter " + 
adapterName);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index 7105efacdd..c6d1dbe88f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -54,7 +54,7 @@ final class DynamicPathResolver extends AbstractPathResolver {
     @Override
     public String getPartitionDirectory(IFrameTupleReference tuple) throws 
HyracksDataException {
         if (!appendPrefix(tuple)) {
-            return ExternalWriter.UNRESOLVABLE_PATH;
+            return ExternalFileWriter.UNRESOLVABLE_PATH;
         }
 
         if (dirStringBuilder.length() > 0 && 
dirStringBuilder.charAt(dirStringBuilder.length() - 1) != fileSeparator) {
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
similarity index 94%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
rename to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
index 5fc07affb4..f9f98dae29 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -23,7 +23,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-final class ExternalWriter implements IExternalWriter {
+final class ExternalFileWriter implements IExternalWriter {
     static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
     private final IPathResolver pathResolver;
     private final IExternalFileWriter writer;
@@ -31,7 +31,7 @@ final class ExternalWriter implements IExternalWriter {
     private String partitionPath;
     private int tupleCounter;
 
-    public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter 
writer, int maxResultPerFile) {
+    public ExternalFileWriter(IPathResolver pathResolver, IExternalFileWriter 
writer, int maxResultPerFile) {
         this.pathResolver = pathResolver;
         this.writer = writer;
         this.maxResultPerFile = maxResultPerFile;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
similarity index 91%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
rename to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
index e7c0db047a..59815845d5 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
@@ -28,17 +28,17 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
-public class ExternalWriterFactory implements IExternalWriterFactory {
+public class ExternalFileWriterFactory implements IExternalWriterFactory {
     private static final long serialVersionUID = 1412969574113419638L;
     private final IExternalFileWriterFactory writerFactory;
-    private final IExternalFilePrinterFactory printerFactory;
+    private final IExternalPrinterFactory printerFactory;
     private final String fileExtension;
     private final int maxResult;
     private final IScalarEvaluatorFactory pathEvalFactory;
     private final String staticPath;
     private final SourceLocation pathSourceLocation;
 
-    public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, 
IExternalFilePrinterFactory printerFactory,
+    public ExternalFileWriterFactory(IExternalFileWriterFactory writerFactory, 
IExternalPrinterFactory printerFactory,
             String fileExtension, int maxResult, IScalarEvaluatorFactory 
pathEvalFactory, String staticPath,
             SourceLocation pathSourceLocation) {
         this.writerFactory = writerFactory;
@@ -65,6 +65,6 @@ public class ExternalWriterFactory implements 
IExternalWriterFactory {
             resolver = new StaticPathResolver(fileExtension, fileSeparator, 
partition, staticPath);
         }
         IExternalFileWriter writer = writerFactory.createWriter(context, 
printerFactory);
-        return new ExternalWriter(resolver, writer, maxResult);
+        return new ExternalFileWriter(resolver, writer, maxResult);
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
similarity index 91%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
rename to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
index b62a07aef8..fbb05eeec1 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
@@ -22,13 +22,13 @@ import java.util.Map;
 
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
-public final class ExternalFileWriterConfiguration {
+public final class ExternalWriterConfiguration {
     private final Map<String, String> configuration;
     private final SourceLocation pathSourceLocation;
     private final String staticPath;
     private final boolean singleNodeCluster;
 
-    public ExternalFileWriterConfiguration(Map<String, String> configuration, 
SourceLocation pathSourceLocation,
+    public ExternalWriterConfiguration(Map<String, String> configuration, 
SourceLocation pathSourceLocation,
             String staticPath, boolean singleNodeCluster) {
         this.configuration = configuration;
         this.pathSourceLocation = pathSourceLocation;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
index d8f1f84365..31dba791b7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -20,15 +20,14 @@ package org.apache.asterix.runtime.writer;
 
 import java.io.Serializable;
 
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * An interface for writing to a storage device
- * Implementer should also provide a singleton to {@link 
IExternalFileFilterWriterFactoryProvider}
+ * Implementer should also provide a singleton to {@link 
IExternalFileWriterFactoryProvider}
  */
-public interface IExternalFileWriterFactory extends Serializable {
+public interface IExternalFileWriterFactory extends 
IExternalWriterFactoryValidator, Serializable {
     /**
      * Create a writer
      *
@@ -36,16 +35,11 @@ public interface IExternalFileWriterFactory extends 
Serializable {
      * @param printerFactory printer factory for writing the final result
      * @return a new file writer
      */
-    IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalFilePrinterFactory printerFactory)
+    IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
             throws HyracksDataException;
 
     /**
      * @return file (or path) separator
      */
     char getSeparator();
-
-    /**
-     * Validate the writer by running a test write routine to ensure the 
writer has the appropriate permissions
-     */
-    void validate() throws AlgebricksException;
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactoryProvider.java
similarity index 85%
copy from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
copy to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactoryProvider.java
index 7a863f79c2..eabd2cb5db 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactoryProvider.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.runtime.writer;
 
-public interface IExternalFileFilterWriterFactoryProvider {
-    IExternalFileWriterFactory create(ExternalFileWriterConfiguration 
configuration);
+public interface IExternalFileWriterFactoryProvider {
+    IExternalFileWriterFactory create(ExternalWriterConfiguration 
configuration);
 
     char getSeparator();
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
similarity index 97%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
rename to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
index ba5fa1db16..54fd152556 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
@@ -26,7 +26,7 @@ import org.apache.hyracks.data.std.api.IValueReference;
 /**
  * An {@link IExternalFileWriter} printer
  */
-public interface IExternalFilePrinter {
+public interface IExternalPrinter {
 
     /**
      * Open the printer
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
similarity index 86%
copy from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
copy to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
index a4fa97bc65..4d9352ab0a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
@@ -23,9 +23,9 @@ import java.io.Serializable;
 /**
  * {@link IExternalFileWriter} printer factory
  */
-public interface IExternalFilePrinterFactory extends Serializable {
+public interface IExternalPrinterFactory extends Serializable {
     /**
-     * @return a new external file printer
+     * @return a new external printer
      */
-    IExternalFilePrinter createPrinter();
+    IExternalPrinter createPrinter();
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
similarity index 76%
copy from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
copy to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
index a4fa97bc65..4a75db6906 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
@@ -18,14 +18,11 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import java.io.Serializable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
-/**
- * {@link IExternalFileWriter} printer factory
- */
-public interface IExternalFilePrinterFactory extends Serializable {
+public interface IExternalWriterFactoryValidator {
     /**
-     * @return a new external file printer
+     * Perform the necessary validation to ensure the writer has the proper 
permissions
      */
-    IExternalFilePrinter createPrinter();
+    void validate() throws AlgebricksException;
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 2072deedd5..5240e0c375 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -65,6 +65,10 @@ public interface IMetadataProvider<S, I> {
             SourceLocation pathSourceLocation, IWriteDataSink sink, 
RecordDescriptor inputDesc, Object sourceType)
             throws AlgebricksException;
 
+    Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteDatabaseWithKeyRuntime(int sourceColumn,
+            IScalarEvaluatorFactory[] keyEvaluatorFactories, boolean 
autogenerated, IWriteDataSink sink,
+            RecordDescriptor inputDesc, Object sourceType) throws 
AlgebricksException;
+
     Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getResultHandleRuntime(IDataSink sink, int[] printColumns,
             IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
             IResultSerializerFactoryProvider resultSerializerFactoryProvider, 
RecordDescriptor inputDesc,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
index 7eef90e509..89a51486a7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
@@ -41,17 +41,21 @@ public class WriteOperator extends AbstractLogicalOperator {
     private final Mutable<ILogicalExpression> pathExpression;
     private final List<Mutable<ILogicalExpression>> partitionExpressions;
     private final List<Pair<OrderOperator.IOrder, 
Mutable<ILogicalExpression>>> orderExpressions;
+    private final List<Mutable<ILogicalExpression>> keyExpressions;
+    private final boolean autogenerated;
     private final IWriteDataSink writeDataSink;
 
     public WriteOperator(Mutable<ILogicalExpression> sourceExpression, 
Mutable<ILogicalExpression> pathExpression,
             List<Mutable<ILogicalExpression>> partitionExpressions,
             List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> 
orderExpressions,
-            IWriteDataSink writeDataSink) {
+            List<Mutable<ILogicalExpression>> keyExpressions, boolean 
autogenerated, IWriteDataSink writeDataSink) {
         this.sourceExpression = sourceExpression;
         this.pathExpression = pathExpression;
         this.partitionExpressions = partitionExpressions;
         this.orderExpressions = orderExpressions;
         this.writeDataSink = writeDataSink;
+        this.keyExpressions = keyExpressions;
+        this.autogenerated = autogenerated;
     }
 
     public Mutable<ILogicalExpression> getSourceExpression() {
@@ -74,6 +78,18 @@ public class WriteOperator extends AbstractLogicalOperator {
         return orderExpressions;
     }
 
+    public List<Mutable<ILogicalExpression>> getKeyExpressions() {
+        return keyExpressions;
+    }
+
+    public List<LogicalVariable> getKeyVariables() {
+        List<LogicalVariable> keyVariables = new ArrayList<>();
+        for (Mutable<ILogicalExpression> keyExpression : keyExpressions) {
+            
keyVariables.add(VariableUtilities.getVariable(keyExpression.getValue()));
+        }
+        return keyVariables;
+    }
+
     public List<LogicalVariable> getPartitionVariables() {
         List<LogicalVariable> partitionVariables = new ArrayList<>();
         for (Mutable<ILogicalExpression> partitionExpression : 
partitionExpressions) {
@@ -92,10 +108,18 @@ public class WriteOperator extends AbstractLogicalOperator 
{
         return orderColumns;
     }
 
+    public boolean getAutogenerated() {
+        return autogenerated;
+    }
+
     public IWriteDataSink getWriteDataSink() {
         return writeDataSink;
     }
 
+    public boolean isFileStoreSink() {
+        return keyExpressions.isEmpty() && !autogenerated;
+    }
+
     @Override
     public LogicalOperatorTag getOperatorTag() {
         return LogicalOperatorTag.WRITE;
@@ -119,6 +143,10 @@ public class WriteOperator extends AbstractLogicalOperator 
{
             changed |= visitor.transform(orderExpressionPair.second);
         }
 
+        for (Mutable<ILogicalExpression> expression : keyExpressions) {
+            changed |= visitor.transform(expression);
+        }
+
         return changed;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index fa47ae502e..8664acf75b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -297,9 +297,11 @@ public class OperatorDeepCopyVisitor implements 
ILogicalOperatorVisitor<ILogical
                 deepCopyExpressionRefs(new ArrayList<>(), 
op.getPartitionExpressions());
         List<Pair<IOrder, Mutable<ILogicalExpression>>> 
newOrderPairExpressions =
                 deepCopyOrderAndExpression(op.getOrderExpressions());
+        List<Mutable<ILogicalExpression>> newKeyPairExpressions =
+                deepCopyExpressionRefs(new ArrayList<>(), 
op.getKeyExpressions());
         IWriteDataSink writeDataSink = op.getWriteDataSink().createCopy();
         return new WriteOperator(newSourceExpression, newPathExpression, 
newPartitionExpressions,
-                newOrderPairExpressions, writeDataSink);
+                newOrderPairExpressions, newKeyPairExpressions, 
op.getAutogenerated(), writeDataSink);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index d0b0608e71..50401c3fba 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -389,6 +389,7 @@ public class SubstituteVariableVisitor
         for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : 
op.getOrderExpressions()) {
             substUsedVariablesInExpr(orderExpr.second, pair.first, 
pair.second);
         }
+        substUsedVariablesInExpr(op.getKeyExpressions(), pair.first, 
pair.second);
         return null;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index d7b2555a83..7186f7e728 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -362,6 +362,10 @@ public class UsedVariableVisitor implements 
ILogicalOperatorVisitor<Void, Void>
         for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : 
op.getOrderExpressions()) {
             orderExpr.second.getValue().getUsedVariables(usedVariables);
         }
+
+        for (Mutable<ILogicalExpression> expr : op.getKeyExpressions()) {
+            expr.getValue().getUsedVariables(usedVariables);
+        }
         return null;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index d462cd5b8c..71f5876857 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -23,6 +23,7 @@ import static 
org.apache.hyracks.algebricks.core.algebra.operators.physical.Abst
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.commons.lang3.mutable.Mutable;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
@@ -61,12 +62,16 @@ public class SinkWritePOperator extends 
AbstractPhysicalOperator {
     private final LogicalVariable sourceVariable;
     private final List<LogicalVariable> partitionVariables;
     private final List<OrderColumn> orderColumns;
+    private final List<LogicalVariable> keyVariables;
+    private final boolean autogenerated;
 
     public SinkWritePOperator(LogicalVariable sourceVariable, 
List<LogicalVariable> partitionVariables,
-            List<OrderColumn> orderColumns) {
+            List<OrderColumn> orderColumns, List<LogicalVariable> 
keyVariables, boolean autogenerated) {
         this.sourceVariable = sourceVariable;
         this.partitionVariables = partitionVariables;
         this.orderColumns = orderColumns;
+        this.keyVariables = keyVariables;
+        this.autogenerated = autogenerated;
     }
 
     @Override
@@ -145,6 +150,16 @@ public class SinkWritePOperator extends 
AbstractPhysicalOperator {
         IBinaryComparatorFactory[] partitionComparatorFactories =
                 
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionVariables, 
typeEnv, context);
 
+        // Key expressions
+        IScalarEvaluatorFactory[] keyEvalFactories = new 
IScalarEvaluatorFactory[write.getKeyExpressions().size()];
+        List<Mutable<ILogicalExpression>> keyExpressions = 
write.getKeyExpressions();
+        if (!keyExpressions.isEmpty()) {
+            for (int i = 0; i < keyExpressions.size(); i++) {
+                ILogicalExpression keyExpr = keyExpressions.get(i).getValue();
+                keyEvalFactories[i] = 
runtimeProvider.createEvaluatorFactory(keyExpr, typeEnv, inputSchemas, context);
+            }
+        }
+
         RecordDescriptor recDesc =
                 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), 
propagatedSchema, context);
         RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
@@ -152,9 +167,17 @@ public class SinkWritePOperator extends 
AbstractPhysicalOperator {
 
         IMetadataProvider<?, ?> mp = context.getMetadataProvider();
 
-        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
runtimeAndConstraints = mp.getWriteFileRuntime(
-                sourceColumn, partitionColumns, partitionComparatorFactories, 
dynamicPathEvalFactory, staticPathExpr,
-                pathExpr.getSourceLocation(), writeDataSink, inputDesc, 
typeEnv.getVarType(sourceVariable));
+        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
runtimeAndConstraints;
+        if (write.isFileStoreSink()) {
+            runtimeAndConstraints = mp.getWriteFileRuntime(sourceColumn, 
partitionColumns, partitionComparatorFactories,
+                    dynamicPathEvalFactory, staticPathExpr, 
pathExpr.getSourceLocation(), writeDataSink, inputDesc,
+                    typeEnv.getVarType(sourceVariable));
+
+        } else {
+            runtimeAndConstraints = 
mp.getWriteDatabaseWithKeyRuntime(sourceColumn, keyEvalFactories, autogenerated,
+                    writeDataSink, inputDesc, 
typeEnv.getVarType(sourceVariable));
+        }
+
         IPushRuntimeFactory runtime = runtimeAndConstraints.first;
         runtime.setSourceLocation(write.getSourceLocation());
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f945994684..3ea2631274 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -408,7 +408,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements 
IAlgebraicRewriteRule
             }
             ensureAllVariables(op.getPartitionExpressions(), v -> v);
             ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
-            return new SinkWritePOperator(op.getSourceVariable(), 
op.getPartitionVariables(), op.getOrderColumns());
+            ensureAllVariables(op.getKeyExpressions(), v -> v);
+            return new SinkWritePOperator(op.getSourceVariable(), 
op.getPartitionVariables(), op.getOrderColumns(),
+                    op.getKeyVariables(), op.getAutogenerated());
         }
 
         @Override
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
similarity index 70%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
rename to 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
index a4fa97bc65..cb72aec258 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
@@ -16,16 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.runtime.writer;
+package org.apache.hyracks.algebricks.runtime.operators.writer;
 
-import java.io.Serializable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
-/**
- * {@link IExternalFileWriter} printer factory
- */
-public interface IExternalFilePrinterFactory extends Serializable {
-    /**
-     * @return a new external file printer
-     */
-    IExternalFilePrinter createPrinter();
-}
+interface IWriterPartitioner {
+    boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws 
HyracksDataException;
+
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
similarity index 58%
copy from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
copy to 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
index 7a863f79c2..3cb44ffa0e 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
@@ -16,10 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.runtime.writer;
+package org.apache.hyracks.algebricks.runtime.operators.writer;
 
-public interface IExternalFileFilterWriterFactoryProvider {
-    IExternalFileWriterFactory create(ExternalFileWriterConfiguration 
configuration);
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
-    char getSeparator();
+class KeyWriterPartitioner implements IWriterPartitioner {
+    public static final IWriterPartitioner INSTANCE = new 
KeyWriterPartitioner();
+
+    private KeyWriterPartitioner() {
+    }
+
+    @Override
+    public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) 
throws HyracksDataException {
+        // Every key is a partition
+        return true;
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
similarity index 58%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
rename to 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
index 7a863f79c2..3221d735d4 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
@@ -16,10 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.runtime.writer;
+package org.apache.hyracks.algebricks.runtime.operators.writer;
 
-public interface IExternalFileFilterWriterFactoryProvider {
-    IExternalFileWriterFactory create(ExternalFileWriterConfiguration 
configuration);
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
-    char getSeparator();
-}
+class NoOpWriterPartitioner implements IWriterPartitioner {
+    private boolean first = true;
+
+    public NoOpWriterPartitioner() {
+    }
+
+    @Override
+    public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) 
throws HyracksDataException {
+        if (first) {
+            first = false;
+            return true;
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
index 01e137b30a..9407c08b9f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -23,44 +23,30 @@ import java.nio.ByteBuffer;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
 import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import 
org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
-import 
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 
 final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
     private final int sourceColumn;
-    private final int[] partitionColumns;
+    private final IWriterPartitioner partitioner;
     private final IPointable sourceValue;
-    private final PointableTupleReference partitionColumnsPrevCopy;
-    private final PermutingFrameTupleReference partitionColumnsRef;
-    private final IBinaryComparator[] partitionComparators;
     private final IExternalWriter writer;
     private FrameTupleAccessor tupleAccessor;
     private FrameTupleReference tupleRef;
-    private boolean first;
     private IFrameWriter frameWriter;
 
-    SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, 
IBinaryComparator[] partitionComparators,
-            RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+    SinkExternalWriterRuntime(int sourceColumn, IWriterPartitioner 
partitioner, RecordDescriptor inputRecordDesc,
+            IExternalWriter writer) {
         this.sourceColumn = sourceColumn;
-        this.partitionColumns = partitionColumns;
+        this.partitioner = partitioner;
         this.sourceValue = new VoidPointable();
-        partitionColumnsRef = new 
PermutingFrameTupleReference(partitionColumns);
-        partitionColumnsPrevCopy =
-                PointableTupleReference.create(partitionColumns.length, 
ArrayBackedValueStorage::new);
-        this.partitionComparators = partitionComparators;
         this.inputRecordDesc = inputRecordDesc;
         this.writer = writer;
-        first = true;
     }
 
     @Override
@@ -83,8 +69,6 @@ final class SinkExternalWriterRuntime extends 
AbstractOneInputSinkPushRuntime {
             }
             setValue(tupleRef, sourceColumn, sourceValue);
             writer.write(sourceValue);
-            partitionColumnsRef.reset(tupleAccessor, i);
-            partitionColumnsPrevCopy.set(partitionColumnsRef);
         }
     }
 
@@ -106,13 +90,7 @@ final class SinkExternalWriterRuntime extends 
AbstractOneInputSinkPushRuntime {
     }
 
     private boolean isNewPartition(int index) throws HyracksDataException {
-        if (first) {
-            first = false;
-            return true;
-        }
-
-        return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, 
tupleAccessor, index, partitionColumns,
-                partitionComparators);
+        return partitioner.isNewPartition(tupleAccessor, index);
     }
 
     private void setValue(IFrameTupleReference tuple, int column, IPointable 
value) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
index 6220dec444..321828f76d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -31,17 +31,30 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 public final class SinkExternalWriterRuntimeFactory extends 
AbstractPushRuntimeFactory {
     private static final long serialVersionUID = -2215789207336628581L;
     private final int sourceColumn;
-    private final int[] partitionColumn;
+    private final int[] partitionColumns;
     private final IBinaryComparatorFactory[] partitionComparatorFactories;
+    private final boolean partitionByKey;
     private final RecordDescriptor inputRecordDescriptor;
     private final IExternalWriterFactory writerFactory;
 
-    public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] 
partitionColumn,
+    public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] 
partitionColumns,
             IBinaryComparatorFactory[] partitionComparatorFactories, 
RecordDescriptor inputRecordDescriptor,
             IExternalWriterFactory writerFactory) {
+        this(sourceColumn, partitionColumns, partitionComparatorFactories, 
false, inputRecordDescriptor, writerFactory);
+    }
+
+    public SinkExternalWriterRuntimeFactory(int sourceColumn, RecordDescriptor 
inputRecordDescriptor,
+            IExternalWriterFactory writerFactory) {
+        this(sourceColumn, null, null, true, inputRecordDescriptor, 
writerFactory);
+    }
+
+    private SinkExternalWriterRuntimeFactory(int sourceColumn, int[] 
partitionColumns,
+            IBinaryComparatorFactory[] partitionComparatorFactories, boolean 
partitionByKey,
+            RecordDescriptor inputRecordDescriptor, IExternalWriterFactory 
writerFactory) {
         this.sourceColumn = sourceColumn;
-        this.partitionColumn = partitionColumn;
+        this.partitionColumns = partitionColumns;
         this.partitionComparatorFactories = partitionComparatorFactories;
+        this.partitionByKey = partitionByKey;
         this.inputRecordDescriptor = inputRecordDescriptor;
         this.writerFactory = writerFactory;
     }
@@ -49,12 +62,33 @@ public final class SinkExternalWriterRuntimeFactory extends 
AbstractPushRuntimeF
     @Override
     public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
         IExternalWriter writer = writerFactory.createWriter(ctx);
-        IBinaryComparator[] partitionComparators = new 
IBinaryComparator[partitionComparatorFactories.length];
-        for (int i = 0; i < partitionComparatorFactories.length; i++) {
-            partitionComparators[i] = 
partitionComparatorFactories[i].createBinaryComparator();
-        }
-        SinkExternalWriterRuntime runtime = new 
SinkExternalWriterRuntime(sourceColumn, partitionColumn,
-                partitionComparators, inputRecordDescriptor, writer);
+        SinkExternalWriterRuntime runtime =
+                new SinkExternalWriterRuntime(sourceColumn, 
createPartitioner(), inputRecordDescriptor, writer);
         return new IPushRuntime[] { runtime };
     }
+
+    /**
+     * Creates the writer partitioner based on the provided parameters
+     *
+     * @return writer partitioner
+     */
+    private IWriterPartitioner createPartitioner() {
+        // key writer partitioner
+        if (partitionByKey) {
+            return KeyWriterPartitioner.INSTANCE;
+        }
+
+        // writer partitioner
+        if (partitionColumns.length > 0) {
+            IBinaryComparator[] comparators = new 
IBinaryComparator[partitionComparatorFactories.length];
+            for (int i = 0; i < partitionComparatorFactories.length; i++) {
+                comparators[i] = 
partitionComparatorFactories[i].createBinaryComparator();
+            }
+
+            return new WriterPartitioner(partitionColumns, comparators);
+        }
+
+        // no-op partitioner
+        return new NoOpWriterPartitioner();
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
new file mode 100644
index 0000000000..468ca4a2fb
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import 
org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import 
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+class WriterPartitioner implements IWriterPartitioner {
+    private final int[] partitionColumns;
+    private final IBinaryComparator[] partitionComparators;
+    private final PointableTupleReference partitionColumnsPrevCopy;
+    private final PermutingFrameTupleReference partitionColumnsRef;
+    private boolean first = true;
+
+    public WriterPartitioner(int[] partitionColumns, IBinaryComparator[] 
partitionComparators) {
+        this.partitionColumns = partitionColumns;
+        this.partitionComparators = partitionComparators;
+        partitionColumnsRef = new 
PermutingFrameTupleReference(partitionColumns);
+        partitionColumnsPrevCopy =
+                PointableTupleReference.create(partitionColumns.length, 
ArrayBackedValueStorage::new);
+    }
+
+    @Override
+    public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) 
throws HyracksDataException {
+        if (first) {
+            first = false;
+            partitionColumnsRef.reset(tupleAccessor, index);
+            partitionColumnsPrevCopy.set(partitionColumnsRef);
+            return true;
+        }
+
+        boolean newPartition = 
!PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor, 
index,
+                partitionColumns, partitionComparators);
+
+        // Set previous
+        partitionColumnsRef.reset(tupleAccessor, index);
+        partitionColumnsPrevCopy.set(partitionColumnsRef);
+
+        return newPartition;
+    }
+}
\ No newline at end of file


Reply via email to