This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 1299537bba59666e25c9a329b452591e43fd4548 Author: Hussain Towaileb <[email protected]> AuthorDate: Wed Oct 29 12:58:59 2025 +0300 [NO ISSUE][EXT]: ensure CSV header is written in all files Details: - if header is enabled in COPY TO CSV, ensure header is written to all files, not just the first file. Ext-ref: MB-69147 Change-Id: I7716408908ddb40ec9f26e892da4f7d888751a8b Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20531 Tested-by: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../writer/AbstractCloudExternalFileWriter.java | 1 + .../external/writer/HDFSExternalFileWriter.java | 1 + .../external/writer/LocalFSExternalFileWriter.java | 1 + .../printer/AbstractTextualExternalPrinter.java | 5 ++++ .../printers/csv/ARecordPrinterFactory.java | 5 ++++ .../om/pointables/printer/ARecordPrinter.java | 5 ++++ .../pointables/printer/AbstractPrintVisitor.java | 2 +- .../pointables/printer/csv/ACSVRecordPrinter.java | 27 +++++++++------------- .../om/pointables/printer/csv/APrintVisitor.java | 21 +++++++++++++++++ .../asterix/runtime/writer/IExternalPrinter.java | 6 +++++ .../apache/hyracks/algebricks/data/IPrinter.java | 3 +++ 11 files changed, 60 insertions(+), 17 deletions(-) diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java index f93ae9246d..e389c039e6 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java @@ -79,6 +79,7 @@ abstract class AbstractCloudExternalFileWriter implements IExternalFileWriter { @Override public final boolean newFile(String directory, String fileName) throws HyracksDataException { + printer.newFile(); String fullPath = directory + fileName; if (checkAndWarnExceedingMaxLength(fullPath)) { return false; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java index f06b8e961f..510c8e4e4e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java @@ -88,6 +88,7 @@ public class HDFSExternalFileWriter implements IExternalFileWriter { @Override public boolean newFile(String directory, String fileName) throws HyracksDataException { + printer.newFile(); directory = HDFSUtils.updateRootPath(directory, true); Path path = new Path(directory, "." + fileName); try { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java index 4166ddee69..906e2724cf 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java @@ -56,6 +56,7 @@ final class LocalFSExternalFileWriter implements IExternalFileWriter { @Override public boolean newFile(String directory, String fileName) throws HyracksDataException { + printer.newFile(); try { File parentDirectory = new File(directory); File currentFile = new File(parentDirectory, fileName); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java index 8d7d60f3a4..38d52f2385 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/AbstractTextualExternalPrinter.java @@ -48,6 +48,11 @@ public abstract class AbstractTextualExternalPrinter implements IExternalPrinter printer.init(); } + @Override + public void newFile() { + printer.initNewBatch(); + } + @Override public void newStream(OutputStream outputStream) throws HyracksDataException { if (printStream != null) { diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java index cef0e6e3f0..3d8b4371c9 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java @@ -66,6 +66,11 @@ public class ARecordPrinterFactory implements IPrinterFactory { arg.second = inputType.getTypeTag(); } + @Override + public void initNewBatch() { + printVisitor.setFirstRecord(true); + } + @Override public void print(byte[] b, int start, int l, PrintStream ps) throws HyracksDataException { recAccessor.set(b, start, l); diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java index 9aaf889574..bcc8c33229 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java @@ -82,6 +82,11 @@ public class ARecordPrinter { ps.print(endRecord); } + public void printRecord(ARecordVisitablePointable recordAccessor, PrintStream ps, IPrintVisitor visitor, + boolean firstRecord) throws HyracksDataException { + printRecord(recordAccessor, ps, visitor); + } + protected void printField(PrintStream ps, IPrintVisitor visitor, IVisitablePointable fieldName, IVisitablePointable fieldValue, ATypeTag fieldTypeTag) throws HyracksDataException { itemVisitorArg.second = fieldTypeTag; diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AbstractPrintVisitor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AbstractPrintVisitor.java index 54bf85b1f4..da1bf0bbbe 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AbstractPrintVisitor.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AbstractPrintVisitor.java @@ -32,7 +32,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; public abstract class AbstractPrintVisitor implements IPrintVisitor { - private final Map<IVisitablePointable, ARecordPrinter> raccessorToPrinter = new HashMap<>(); + protected final Map<IVisitablePointable, ARecordPrinter> raccessorToPrinter = new HashMap<>(); private final Map<IVisitablePointable, AListPrinter> laccessorToPrinter = new HashMap<>(); @Override diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java index b031c64ffa..f84e2062fe 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java @@ -52,7 +52,6 @@ public class ACSVRecordPrinter extends ARecordPrinter { private final boolean header; private final String recordDelimiter; private final Map<String, ATypeTag> recordSchemaDetails = new HashMap<>(); - private boolean firstRecord; private List<String> expectedFieldNames; private List<IAType> expectedFieldTypes; @@ -62,7 +61,6 @@ public class ACSVRecordPrinter extends ARecordPrinter { this.warningCollector = warningCollector; this.header = header; this.schema = schema; - this.firstRecord = true; this.recordDelimiter = recordDelimiter; if (schema != null) { this.expectedFieldNames = Arrays.asList(schema.getFieldNames()); @@ -71,35 +69,32 @@ public class ACSVRecordPrinter extends ARecordPrinter { } @Override - public void printRecord(ARecordVisitablePointable recordAccessor, PrintStream ps, IPrintVisitor visitor) - throws HyracksDataException { + public void printRecord(ARecordVisitablePointable recordAccessor, PrintStream ps, IPrintVisitor visitor, + boolean firstRecord) throws HyracksDataException { // backward compatibility - no schema provided, print it as is from recordAccessor if (schema == null) { super.printRecord(recordAccessor, ps, visitor); } else { - printSchemaFullRecord(recordAccessor, ps, visitor); + printSchemaFullRecord(recordAccessor, ps, visitor, firstRecord); } } - private void printSchemaFullRecord(ARecordVisitablePointable recordAccessor, PrintStream ps, IPrintVisitor visitor) - throws HyracksDataException { + private void printSchemaFullRecord(ARecordVisitablePointable recordAccessor, PrintStream ps, IPrintVisitor visitor, + boolean firstRecord) throws HyracksDataException { // check the schema for the record // try producing the record into the record of expected schema if (isValidSchema(recordAccessor)) { nameVisitorArg.first = ps; itemVisitorArg.first = ps; - if (header && firstRecord) { - printHeader(recordAccessor, ps, visitor); - firstRecord = false; - } - // add record delimiter - // by default the separator between the header and the records is "\n" - if (firstRecord) { - firstRecord = false; - } else { + if (!firstRecord) { + ps.print(recordDelimiter); + } else if (header) { + // it's first record and header is true + printHeader(recordAccessor, ps, visitor); ps.print(recordDelimiter); } + final List<IVisitablePointable> fieldNames = recordAccessor.getFieldNames(); final List<IVisitablePointable> fieldValues = recordAccessor.getFieldValues(); diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java index 7d5a52df86..b8b7a0f419 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java @@ -31,6 +31,7 @@ import org.apache.asterix.om.pointables.printer.ARecordPrinter; import org.apache.asterix.om.pointables.printer.AbstractPrintVisitor; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.context.IEvaluatorContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -45,6 +46,7 @@ public class APrintVisitor extends AbstractPrintVisitor { private final Map<String, String> formatConfigs; private final Map<String, String> configuration; private AObjectPrinterFactory objectPrinterFactory; + private boolean firstRecord = true; public APrintVisitor(IEvaluatorContext context, ARecordType itemType, Map<String, String> formatConfigs, Map<String, String> configuration) { @@ -76,4 +78,23 @@ public class APrintVisitor extends AbstractPrintVisitor { } return objectPrinterFactory.printFlatValue(typeTag, b, s, l, ps); } + + @Override + public Void visit(ARecordVisitablePointable accessor, Pair<PrintStream, ATypeTag> arg) throws HyracksDataException { + ARecordPrinter printer = raccessorToPrinter.get(accessor); + if (printer == null) { + printer = createRecordPrinter(accessor); + raccessorToPrinter.put(accessor, printer); + } + boolean first = firstRecord; + if (firstRecord) { + firstRecord = false; + } + printer.printRecord(accessor, arg.getFirst(), this, first); + return null; + } + + public void setFirstRecord(boolean firstRecord) { + this.firstRecord = firstRecord; + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java index 54fd152556..38ede08d52 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java @@ -33,6 +33,12 @@ public interface IExternalPrinter { */ void open() throws HyracksDataException; + /** + * Indicates the start of a new file + */ + default void newFile() { + } + /** * Initialize the printer with a new stream * diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java index 2ccb450138..79503b3766 100644 --- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java +++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IPrinter.java @@ -27,5 +27,8 @@ public interface IPrinter { default void init() throws HyracksDataException { } + default void initNewBatch() { + } + void print(byte[] b, int s, int l, PrintStream ps) throws HyracksDataException; }
