This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 4229b4e519d90aa24c370acf388a796b4891781d Author: preetham0202 <[email protected]> AuthorDate: Tue Feb 11 17:44:06 2025 +0530 [ASTERIXDB-3392] Support empty spaces, "=" in field names in COPY TO parquet - user model changes: no - storage format changes: no - interface changes: no Details: parquet-java SDK doesn't support empty spaces,"=" in their MessageTypeParser.parseMessageType(). Serialised Schema can't be passed onto the ParquetExternalFilePrinterFactory due to this. So the schema is built twice: first time to catch errors during compilation, second time to build the schema. Ext-ref: MB-65167 Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408 Integration-Tests: Jenkins <[email protected]> Integration-Tests: Hussain Towaileb <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: <[email protected]> Tested-by: Hussain Towaileb <[email protected]> (cherry picked from commit 7f2c407e7024a2ea638c0e841b04f4028899b79a) Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19564 Tested-by: Jenkins <[email protected]> --- .../asterix/translator/CompiledStatements.java | 6 ++++ .../translator/LangExpressionToPlanTranslator.java | 2 +- .../asterix/app/translator/QueryTranslator.java | 4 +-- .../parquet-field-names.01.ddl.sqlpp} | 15 ++++----- .../parquet-field-names.02.update.sqlpp} | 21 ++++++------ .../parquet-field-names.03.update.sqlpp} | 21 +++++++----- .../parquet-field-names.04.ddl.sqlpp} | 22 ++++++++----- .../parquet-field-names.05.query.sqlpp} | 13 +++----- .../parquet-field-names.06.update.sqlpp} | 20 +++++++----- .../parquet-field-names.07.ddl.sqlpp} | 19 +++++------ .../parquet-field-names.08.query.sqlpp} | 13 +++----- .../parquet-field-names/parquet-field-names.05.adm | 1 + .../parquet-field-names/parquet-field-names.08.adm | 1 + .../runtimets/testsuite_external_dataset_s3.xml | 10 ++++++ .../parquet/ParquetExternalWriterFactory.java | 2 +- .../parquet/ParquetSchemaInferPoolWriter.java | 2 ++ .../parquet/ParquetSinkExternalWriterRuntime.java | 1 + .../writer/printer/ParquetExternalFilePrinter.java | 10 +++--- .../printer/ParquetExternalFilePrinterFactory.java | 37 +++++++++++++++++----- .../printer/parquet/FieldNamesDictionary.java | 2 ++ .../writer/printer/parquet/ISchemaChecker.java | 4 +++ .../printer/parquet/ParquetRecordLazyVisitor.java | 2 ++ .../printer/parquet/ParquetSchemaLazyVisitor.java | 1 + .../writer/printer/parquet/ParquetValueWriter.java | 1 + .../printer/parquet/SchemaCheckerLazyVisitor.java | 2 ++ .../printer/parquet/SchemaConverterVisitor.java | 5 +-- .../lang/common/statement/ExternalDetailsDecl.java | 9 ++++++ .../metadata/declared/IExternalWriteDataSink.java | 2 ++ .../asterix/metadata/declared/WriteDataSink.java | 14 ++++++-- .../metadata/provider/ExternalWriterProvider.java | 10 +++--- 30 files changed, 179 insertions(+), 93 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java index a39bc06d05..6813e84e3b 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java @@ -607,6 +607,7 @@ public class CompiledStatements { private final boolean autogenerated; private final ARecordType itemType; + private final ARecordType parquetSchema; public CompiledCopyToStatement(CopyToStatement copyToStatement) { this.query = copyToStatement.getQuery(); @@ -623,6 +624,7 @@ public class CompiledStatements { this.keyExpressions = copyToStatement.getKeyExpressions(); this.autogenerated = copyToStatement.isAutogenerated(); this.itemType = eddDecl.getItemType(); + this.parquetSchema = eddDecl.getParquetSchema(); } @Override @@ -650,6 +652,10 @@ public class CompiledStatements { return itemType; } + public ARecordType getParquetSchema() { + return parquetSchema; + } + public List<Expression> getPathExpressions() { return pathExpressions; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java index 1bfb90826c..3bb871b0c8 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java @@ -467,7 +467,7 @@ abstract class LangExpressionToPlanTranslator // Write adapter configuration WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties(), - copyTo.getItemType(), expr.getSourceLocation()); + copyTo.getItemType(), copyTo.getParquetSchema(), expr.getSourceLocation()); // writeOperator WriteOperator writeOperator = new WriteOperator(sourceExprRef, new MutableObject<>(fullPathExpr), diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index f3b8dc98d5..069e99cba1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -4219,8 +4219,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME); IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse, ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx); - edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY, - SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType)); + edd.setParquetSchema((ARecordType) iaType); + SchemaConverterVisitor.convertToParquetSchema((ARecordType) iaType); } break; case ExternalDataConstants.FORMAT_CSV: diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp similarity index 69% copy from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp index 64f8d6d751..8f9bb53772 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp @@ -17,14 +17,13 @@ * under the License. */ -package org.apache.asterix.metadata.declared; +DROP DATAVERSE test if exists; +CREATE DATAVERSE test; +USE test; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink; -import org.apache.hyracks.api.exceptions.SourceLocation; -public interface IExternalWriteDataSink extends IWriteDataSink { - ARecordType getItemType(); +CREATE TYPE ColumnType1 AS { + id: integer + }; - SourceLocation getSourceLoc(); -} +CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id; \ No newline at end of file diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp similarity index 62% copy from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp index 64f8d6d751..9022632699 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp @@ -16,15 +16,18 @@ * specific language governing permissions and limitations * under the License. */ +/* + * Description : create a dataset using year-month-duration as the primary key + * Expected Res : Success + * Date : 7 May 2013 + * Issue : 363 + */ -package org.apache.asterix.metadata.declared; - -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink; -import org.apache.hyracks.api.exceptions.SourceLocation; +use test; +/* +insert into TestCollection({"id":`year-month-duration`("P16Y"), "name": "John"}); +insert into TestCollection({"id":`day-time-duration`("-P3829H849.392S"), "name": "Alex"}); +*/ -public interface IExternalWriteDataSink extends IWriteDataSink { - ARecordType getItemType(); +insert into TestCollection({"id":18, "Director=name": "SS Rajamouli", "Director.Age" : 51 ,"Films Made" : ["RRR", "Eega", "Baahubali"] }); - SourceLocation getSourceLoc(); -} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp similarity index 69% copy from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp index 64f8d6d751..3da960b3c5 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp @@ -17,14 +17,19 @@ * under the License. */ -package org.apache.asterix.metadata.declared; +USE test; + +COPY ( + select c.* from TestCollection c +) toWriter +TO %adapter% +PATH (%pathprefix% "copy-to-result", "parquet-field-names1") +TYPE ( { id:int, `Director=name` : string, `Director.Age` : int ,`Films Made` : [string] } ) +WITH { + %template_colons%, + %additionalProperties% + "format":"parquet" +}; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink; -import org.apache.hyracks.api.exceptions.SourceLocation; -public interface IExternalWriteDataSink extends IWriteDataSink { - ARecordType getItemType(); - SourceLocation getSourceLoc(); -} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp similarity index 69% copy from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp index 64f8d6d751..38acb88723 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp @@ -17,14 +17,20 @@ * under the License. */ -package org.apache.asterix.metadata.declared; +USE test; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink; -import org.apache.hyracks.api.exceptions.SourceLocation; -public interface IExternalWriteDataSink extends IWriteDataSink { - ARecordType getItemType(); +CREATE TYPE ColumnType2 AS { +}; - SourceLocation getSourceLoc(); -} + + +CREATE EXTERNAL DATASET TestDataset1(ColumnType2) USING %adapter% +( + %template%, + %additional_Properties%, + ("definition"="%path_prefix%copy-to-result/parquet-field-names1/"), + ("include"="*.parquet"), + ("requireVersionChangeDetection"="false"), + ("format" = "parquet") +); \ No newline at end of file diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp similarity index 69% copy from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp index 64f8d6d751..86d344ca4f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp @@ -17,14 +17,11 @@ * under the License. */ -package org.apache.asterix.metadata.declared; +USE test; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink; -import org.apache.hyracks.api.exceptions.SourceLocation; -public interface IExternalWriteDataSink extends IWriteDataSink { - ARecordType getItemType(); +SELECT c.* +FROM TestDataset1 c +ORDER BY c.id; + - SourceLocation getSourceLoc(); -} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp similarity index 69% copy from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp index 64f8d6d751..f72a1f5488 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp @@ -17,14 +17,18 @@ * under the License. */ -package org.apache.asterix.metadata.declared; +USE test; + +COPY ( +select c.* from TestCollection c + ) toWriter +TO %adapter% +PATH (%pathprefix% "copy-to-result", "parquet-field-names2") +WITH { + %template_colons%, + %additionalProperties% + "format":"parquet" + }; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink; -import org.apache.hyracks.api.exceptions.SourceLocation; -public interface IExternalWriteDataSink extends IWriteDataSink { - ARecordType getItemType(); - SourceLocation getSourceLoc(); -} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp similarity index 69% copy from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp index 64f8d6d751..17003c5b36 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp @@ -17,14 +17,15 @@ * under the License. */ -package org.apache.asterix.metadata.declared; +USE test; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink; -import org.apache.hyracks.api.exceptions.SourceLocation; -public interface IExternalWriteDataSink extends IWriteDataSink { - ARecordType getItemType(); - - SourceLocation getSourceLoc(); -} +CREATE EXTERNAL DATASET TestDataset2(ColumnType2) USING %adapter% +( + %template%, + %additional_Properties%, + ("definition"="%path_prefix%copy-to-result/parquet-field-names2/"), + ("include"="*.parquet"), + ("requireVersionChangeDetection"="false"), + ("format" = "parquet") +); \ No newline at end of file diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp similarity index 69% copy from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp index 64f8d6d751..ce09a4a5ef 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp @@ -17,14 +17,11 @@ * under the License. */ -package org.apache.asterix.metadata.declared; +USE test; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink; -import org.apache.hyracks.api.exceptions.SourceLocation; -public interface IExternalWriteDataSink extends IWriteDataSink { - ARecordType getItemType(); +SELECT c.* +FROM TestDataset2 c +ORDER BY c.id; + - SourceLocation getSourceLoc(); -} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm new file mode 100644 index 0000000000..4566d53862 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm @@ -0,0 +1 @@ +{ "id": 18, "Director=name": "SS Rajamouli", "Director.Age": 51, "Films Made": [ "RRR", "Eega", "Baahubali" ] } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm new file mode 100644 index 0000000000..fa88a74d78 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm @@ -0,0 +1 @@ +{ "Director.Age": 51, "Films Made": [ "RRR", "Eega", "Baahubali" ], "Director=name": "SS Rajamouli", "id": 18 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index a76f9de437..db72e10aba 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -114,6 +114,16 @@ <output-dir compare="Text">parquet-cover-data-types</output-dir> </compilation-unit> </test-case> + <test-case FilePath="copy-to"> + <compilation-unit name="parquet-field-names"> + <placeholder name="adapter" value="S3" /> + <placeholder name="pathprefix" value="" /> + <placeholder name="path_prefix" value="" /> + <placeholder name="additionalProperties" value='"container":"playground",' /> + <placeholder name="additional_Properties" value='("container"="playground")' /> + <output-dir compare="Text">parquet-field-names</output-dir> + </compilation-unit> + </test-case> <test-case FilePath="copy-to"> <compilation-unit name="parquet-empty-array"> <placeholder name="adapter" value="S3" /> diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java index d754068f00..5d9ab7f04a 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java @@ -53,7 +53,7 @@ public class ParquetExternalWriterFactory implements Serializable { public IExternalWriter createWriter(ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException { MessageType schema = generateSchema(schemaNode); - printerFactory.setParquetSchemaString(schema.toString()); + printerFactory.setParquetSchema(schema); IExternalFileWriter writer = writerFactory.createWriter(ctx, printerFactory); return new ExternalFileWriter(resolver, writer, maxResult); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java index b500cbe86a..25fbdc836c 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +// Maintains a pool of Parquet writers holding a file, each with its own schema , and writes values to the appropriate writer based on schema. public class ParquetSchemaInferPoolWriter { private final ParquetExternalWriterFactory writerFactory; @@ -57,6 +58,7 @@ public class ParquetSchemaInferPoolWriter { if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.EQUIVALENT)) { return; } else if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.GROWING)) { + // If the schema is growing, close the existing writer and create a new one with the new schema. schemaNodes.set(i, schemaLazyVisitor.inferSchema(value)); closeWriter(i); return; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java index 3dbd4d36a5..7c1c03bdda 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java @@ -76,6 +76,7 @@ public class ParquetSinkExternalWriterRuntime extends AbstractOneInputSinkPushRu } + // Schema Inference is done frame wise, i.e., we infer the schema for all the records in frame and write the values with schema inferred until now. @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { tupleAccessor.reset(buffer); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java index ba7a1ee71b..046c03f707 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java @@ -34,23 +34,22 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; public class ParquetExternalFilePrinter implements IExternalPrinter { private final IAType typeInfo; private final CompressionCodecName compressionCodecName; - private MessageType schema; + private final MessageType schema; private ParquetOutputFile parquetOutputFile; - private String parquetSchemaString; + // private String parquetSchemaString; private ParquetWriter<IValueReference> writer; private final long rowGroupSize; private final int pageSize; private final ParquetProperties.WriterVersion writerVersion; - public ParquetExternalFilePrinter(CompressionCodecName compressionCodecName, String parquetSchemaString, + public ParquetExternalFilePrinter(CompressionCodecName compressionCodecName, MessageType parquetSchemaString, IAType typeInfo, long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) { this.compressionCodecName = compressionCodecName; - this.parquetSchemaString = parquetSchemaString; + this.schema = parquetSchemaString; this.typeInfo = typeInfo; this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; @@ -59,7 +58,6 @@ public class ParquetExternalFilePrinter implements IExternalPrinter { @Override public void open() throws HyracksDataException { - schema = MessageTypeParser.parseMessageType(parquetSchemaString); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java index 22e56d5f40..797c1adaa5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java @@ -18,26 +18,35 @@ */ package org.apache.asterix.external.writer.printer; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor; +import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.writer.IExternalPrinter; import org.apache.asterix.runtime.writer.IExternalPrinterFactory; import org.apache.hyracks.api.context.IEvaluatorContext; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; public class ParquetExternalFilePrinterFactory implements IExternalPrinterFactory { private static final long serialVersionUID = 8971234908711235L; - private String parquetSchemaString; + // parquetInferSchema is for the case when the schema is inferred from the data, not provided by the user + // set During the runtime + private transient MessageType parquetInferSchema; + // parquetProvidedSchema is for the case when the schema is provided by the user + private ARecordType parquetProvidedSchema; private final IAType typeInfo; private final CompressionCodecName compressionCodecName; private final long rowGroupSize; private final int pageSize; private final ParquetProperties.WriterVersion writerVersion; - public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, String parquetSchemaString, - IAType typeInfo, long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) { + public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, + ARecordType parquetprovidedSchema, IAType typeInfo, long rowGroupSize, int pageSize, + ParquetProperties.WriterVersion writerVersion) { this.compressionCodecName = compressionCodecName; - this.parquetSchemaString = parquetSchemaString; + this.parquetProvidedSchema = parquetprovidedSchema; this.typeInfo = typeInfo; this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; @@ -53,13 +62,25 @@ public class ParquetExternalFilePrinterFactory implements IExternalPrinterFactor this.writerVersion = writerVersion; } - public void setParquetSchemaString(String parquetSchemaString) { - this.parquetSchemaString = parquetSchemaString; + public void setParquetSchema(MessageType parquetInferSchema) { + this.parquetInferSchema = parquetInferSchema; } @Override public IExternalPrinter createPrinter(IEvaluatorContext context) { - return new ParquetExternalFilePrinter(compressionCodecName, parquetSchemaString, typeInfo, rowGroupSize, - pageSize, writerVersion); + if (parquetInferSchema != null) { + return new ParquetExternalFilePrinter(compressionCodecName, parquetInferSchema, typeInfo, rowGroupSize, + pageSize, writerVersion); + } + + MessageType schema; + try { + schema = SchemaConverterVisitor.convertToParquetSchema(parquetProvidedSchema); + } catch (CompilationException e) { + // This should not happen, Compilation Exception should be caught at the query-compile time + throw new RuntimeException(e); + } + return new ParquetExternalFilePrinter(compressionCodecName, schema, typeInfo, rowGroupSize, pageSize, + writerVersion); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java index 7058bf6287..cdf24c6de0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java @@ -26,6 +26,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.util.string.UTF8StringUtil; +// The Field Names Dictionary will cache the mapping between field name bytes and their corresponding string representations, +// minimizing the creation of new string objects during field name deserialization while writing to parquet files. public class FieldNamesDictionary { private final FieldNamesTrieDictionary trie; private final List<String> fieldNames; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java index 99b97369df..dfa6e4f3de 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java @@ -22,6 +22,10 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; public interface ISchemaChecker { + + // EQUIVALENT: Example: { name: string, age: int } -> { name: string, age: int } + // GROWING: equivalent types but having extra fields, Example: { name: string, age: int } -> { name: string, age: int , address: string } + // CONFLICTING: conflict in types, Example: { name: string, age: int } -> { name: {first:string, last:string}, age: int } enum SchemaComparisonType { EQUIVALENT, GROWING, diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java index 373bfe4626..2a03bfd024 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java @@ -44,6 +44,8 @@ public class ParquetRecordLazyVisitor implements ILazyVisitablePointableVisitor< private final MessageType schema; private final RecordLazyVisitablePointable rec; + // The Record Consumer is responsible for traversing the record tree, + // using recordConsumer.startField() to navigate into a child node and endField() to move back to the parent node. private RecordConsumer recordConsumer; private final FieldNamesDictionary fieldNamesDictionary; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java index b59117571b..70872bb2ec 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java @@ -39,6 +39,7 @@ import org.apache.hyracks.data.std.api.IValueReference; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; +// This class is used to infer the schema of a record into SchemaNode, which is an internal tree representation of the schema. public class ParquetSchemaLazyVisitor implements ILazyVisitablePointableVisitor<Void, ParquetSchemaTree.SchemaNode> { private final RecordLazyVisitablePointable rec; private final FieldNamesDictionary fieldNamesDictionary; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java index 04e11f7e21..38d12f910a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java @@ -44,6 +44,7 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.PrimitiveType; +//This class reduces the number of Java objects created each time a column is written to a Parquet file by reusing the same VoidPointable for all columns within the file. public class ParquetValueWriter { public static final String LIST_FIELD = "list"; public static final String ELEMENT_FIELD = "element"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java index 44cd5b256d..fc43c89afc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java @@ -30,6 +30,8 @@ import org.apache.asterix.om.types.IAType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; +// This class is used to check the schema of a record against a schema that has been inferred so far. +// By checking, we can determine if the record is equivalent to the schema, if the record is growing, or if there is a conflict. public class SchemaCheckerLazyVisitor implements ISchemaChecker, ILazyVisitablePointableVisitor<ISchemaChecker.SchemaComparisonType, ParquetSchemaTree.SchemaNode> { private final FieldNamesDictionary fieldNamesDictionary; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java index a6ea1159a6..9f5d02f016 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java @@ -36,6 +36,7 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; +// Traverses the RecordType tree and converts it to a Parquet schema. public class SchemaConverterVisitor implements IATypeVisitor<Void, Pair<Types.Builder, String>> { public static String MESSAGE_NAME = "asterix_schema"; private final ARecordType schemaType; @@ -46,9 +47,9 @@ public class SchemaConverterVisitor implements IATypeVisitor<Void, Pair<Types.Bu this.unsupportedType = null; } - public static String convertToParquetSchemaString(ARecordType schemaType) throws CompilationException { + public static MessageType convertToParquetSchema(ARecordType schemaType) throws CompilationException { SchemaConverterVisitor schemaConverterVisitor = new SchemaConverterVisitor(schemaType); - return schemaConverterVisitor.getParquetSchema().toString(); + return schemaConverterVisitor.getParquetSchema(); } private MessageType getParquetSchema() throws CompilationException { diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java index e1c978a97c..126f3ba365 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java @@ -26,6 +26,7 @@ public class ExternalDetailsDecl implements IDatasetDetailsDecl { private Map<String, String> properties; private String adapter; private ARecordType itemType; + private ARecordType parquetSchema; public void setAdapter(String adapter) { this.adapter = adapter; @@ -43,6 +44,14 @@ public class ExternalDetailsDecl implements IDatasetDetailsDecl { return itemType; } + public void setParquetSchema(ARecordType parquetSchema) { + this.parquetSchema = parquetSchema; + } + + public ARecordType getParquetSchema() { + return parquetSchema; + } + public String getAdapter() { return adapter; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java index 64f8d6d751..1168ba1de7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java @@ -26,5 +26,7 @@ import org.apache.hyracks.api.exceptions.SourceLocation; public interface IExternalWriteDataSink extends IWriteDataSink { ARecordType getItemType(); + ARecordType getParquetSchema(); + SourceLocation getSourceLoc(); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java index d1667bf116..4a10f7fcaf 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java @@ -28,14 +28,16 @@ import org.apache.hyracks.api.exceptions.SourceLocation; public class WriteDataSink implements IExternalWriteDataSink { private final String adapterName; private final Map<String, String> configuration; - private ARecordType itemType; - private SourceLocation sourceLoc; + private final ARecordType itemType; + private final ARecordType parquetSchema; + private final SourceLocation sourceLoc; public WriteDataSink(String adapterName, Map<String, String> configuration, ARecordType itemType, - SourceLocation sourceLoc) { + ARecordType parquetSchema, SourceLocation sourceLoc) { this.adapterName = adapterName; this.configuration = configuration; this.itemType = itemType; + this.parquetSchema = parquetSchema; this.sourceLoc = sourceLoc; } @@ -43,6 +45,7 @@ public class WriteDataSink implements IExternalWriteDataSink { this.adapterName = writeDataSink.getAdapterName(); this.configuration = new HashMap<>(writeDataSink.configuration); this.itemType = writeDataSink.itemType; + this.parquetSchema = writeDataSink.parquetSchema; this.sourceLoc = writeDataSink.sourceLoc; } @@ -51,6 +54,11 @@ public class WriteDataSink implements IExternalWriteDataSink { return itemType; } + @Override + public ARecordType getParquetSchema() { + return parquetSchema; + } + @Override public SourceLocation getSourceLoc() { return sourceLoc; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java index e3f8221cdd..23235c5328 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java @@ -203,7 +203,7 @@ public class ExternalWriterProvider { case ExternalDataConstants.FORMAT_PARQUET: CompressionCodecName compressionCodecName; - if (compression == null || compression.equals("") || compression.equals("none")) { + if (compression == null || compression.isEmpty() || compression.equals("none")) { compressionCodecName = CompressionCodecName.UNCOMPRESSED; } else { compressionCodecName = CompressionCodecName.valueOf(compression.toUpperCase()); @@ -216,10 +216,11 @@ public class ExternalWriterProvider { int pageSize = (int) StorageUtil.getByteValue(pageSizeString); ParquetProperties.WriterVersion writerVersion = getParquetWriterVersion(configuration); - if (configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY) != null) { - String parquetSchemaString = configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY); + ARecordType parquetSchema = ((IExternalWriteDataSink) sink).getParquetSchema(); + + if (parquetSchema != null) { ParquetExternalFilePrinterFactory parquetPrinterFactory = - new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString, + new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchema, (IAType) sourceType, rowGroupSize, pageSize, writerVersion); ExternalFileWriterFactory parquetWriterFactory = new ExternalFileWriterFactory(fileWriterFactory, @@ -228,6 +229,7 @@ public class ExternalWriterProvider { partitionComparatorFactories, inputDesc, parquetWriterFactory); } + // Parquet Writing with Schema Inference int maxSchemas = ExternalWriterProvider.getMaxParquetSchema(configuration); ParquetExternalFilePrinterFactoryProvider printerFactoryProvider = new ParquetExternalFilePrinterFactoryProvider(compressionCodecName, (IAType) sourceType,
