This is an automated email from the ASF dual-hosted git repository.
htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7f2c407e70 [ASTERIXDB-3392] Support empty spaces, "=" in field names
in COPY TO parquet
7f2c407e70 is described below
commit 7f2c407e7024a2ea638c0e841b04f4028899b79a
Author: preetham0202 <[email protected]>
AuthorDate: Tue Feb 11 17:44:06 2025 +0530
[ASTERIXDB-3392] Support empty spaces, "=" in field names in COPY TO parquet
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
parquet-java SDK doesn't support empty spaces,"=" in their
MessageTypeParser.parseMessageType(). Serialised Schema can't be passed onto
the ParquetExternalFilePrinterFactory due to this. So the schema is built
twice: first time to catch errors during compilation, second time to build the
schema.
Ext-ref: MB-65167
Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408
Integration-Tests: Jenkins <[email protected]>
Integration-Tests: Hussain Towaileb <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
Reviewed-by: <[email protected]>
Tested-by: Hussain Towaileb <[email protected]>
---
.../asterix/translator/CompiledStatements.java | 6 ++++
.../translator/LangExpressionToPlanTranslator.java | 2 +-
.../asterix/app/translator/QueryTranslator.java | 4 +--
.../parquet-field-names.01.ddl.sqlpp} | 15 ++++-----
.../parquet-field-names.02.update.sqlpp} | 21 ++++++------
.../parquet-field-names.03.update.sqlpp} | 21 +++++++-----
.../parquet-field-names.04.ddl.sqlpp} | 22 ++++++++-----
.../parquet-field-names.05.query.sqlpp} | 13 +++-----
.../parquet-field-names.06.update.sqlpp} | 20 +++++++-----
.../parquet-field-names.07.ddl.sqlpp} | 19 +++++------
.../parquet-field-names.08.query.sqlpp} | 13 +++-----
.../parquet-field-names/parquet-field-names.05.adm | 1 +
.../parquet-field-names/parquet-field-names.08.adm | 1 +
.../runtimets/testsuite_external_dataset_s3.xml | 10 ++++++
.../parquet/ParquetExternalWriterFactory.java | 2 +-
.../parquet/ParquetSchemaInferPoolWriter.java | 2 ++
.../parquet/ParquetSinkExternalWriterRuntime.java | 1 +
.../writer/printer/ParquetExternalFilePrinter.java | 10 +++---
.../printer/ParquetExternalFilePrinterFactory.java | 37 +++++++++++++++++-----
.../printer/parquet/FieldNamesDictionary.java | 2 ++
.../writer/printer/parquet/ISchemaChecker.java | 4 +++
.../printer/parquet/ParquetRecordLazyVisitor.java | 2 ++
.../printer/parquet/ParquetSchemaLazyVisitor.java | 1 +
.../writer/printer/parquet/ParquetValueWriter.java | 1 +
.../printer/parquet/SchemaCheckerLazyVisitor.java | 2 ++
.../printer/parquet/SchemaConverterVisitor.java | 5 +--
.../lang/common/statement/ExternalDetailsDecl.java | 9 ++++++
.../metadata/declared/IExternalWriteDataSink.java | 2 ++
.../asterix/metadata/declared/WriteDataSink.java | 14 ++++++--
.../metadata/provider/ExternalWriterProvider.java | 10 +++---
30 files changed, 179 insertions(+), 93 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index a39bc06d05..6813e84e3b 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -607,6 +607,7 @@ public class CompiledStatements {
private final boolean autogenerated;
private final ARecordType itemType;
+ private final ARecordType parquetSchema;
public CompiledCopyToStatement(CopyToStatement copyToStatement) {
this.query = copyToStatement.getQuery();
@@ -623,6 +624,7 @@ public class CompiledStatements {
this.keyExpressions = copyToStatement.getKeyExpressions();
this.autogenerated = copyToStatement.isAutogenerated();
this.itemType = eddDecl.getItemType();
+ this.parquetSchema = eddDecl.getParquetSchema();
}
@Override
@@ -650,6 +652,10 @@ public class CompiledStatements {
return itemType;
}
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
public List<Expression> getPathExpressions() {
return pathExpressions;
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 978997c223..f8d0ff69da 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -467,7 +467,7 @@ abstract class LangExpressionToPlanTranslator
// Write adapter configuration
WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(),
copyTo.getProperties(),
- copyTo.getItemType(), expr.getSourceLocation());
+ copyTo.getItemType(), copyTo.getParquetSchema(),
expr.getSourceLocation());
// writeOperator
WriteOperator writeOperator = new WriteOperator(sourceExprRef, new
MutableObject<>(fullPathExpr),
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 334c07412b..e1b4bb0570 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4212,8 +4212,8 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
IAType iaType =
translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse,
ExternalDataConstants.DUMMY_TYPE_NAME,
copyTo.getType(), mdTxnCtx);
-
edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
-
SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
+ edd.setParquetSchema((ARecordType) iaType);
+
SchemaConverterVisitor.convertToParquetSchema((ARecordType) iaType);
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp
similarity index 69%
copy from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp
index 64f8d6d751..8f9bb53772 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp
@@ -17,14 +17,13 @@
* under the License.
*/
-package org.apache.asterix.metadata.declared;
+DROP DATAVERSE test if exists;
+CREATE DATAVERSE test;
+USE test;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-public interface IExternalWriteDataSink extends IWriteDataSink {
- ARecordType getItemType();
+CREATE TYPE ColumnType1 AS {
+ id: integer
+ };
- SourceLocation getSourceLoc();
-}
+CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
\ No newline at end of file
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp
similarity index 62%
copy from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp
index 64f8d6d751..9022632699 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp
@@ -16,15 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
+/*
+ * Description : create a dataset using year-month-duration as the primary key
+ * Expected Res : Success
+ * Date : 7 May 2013
+ * Issue : 363
+ */
-package org.apache.asterix.metadata.declared;
-
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
-import org.apache.hyracks.api.exceptions.SourceLocation;
+use test;
+/*
+insert into TestCollection({"id":`year-month-duration`("P16Y"), "name":
"John"});
+insert into TestCollection({"id":`day-time-duration`("-P3829H849.392S"),
"name": "Alex"});
+*/
-public interface IExternalWriteDataSink extends IWriteDataSink {
- ARecordType getItemType();
+insert into TestCollection({"id":18, "Director=name": "SS Rajamouli",
"Director.Age" : 51 ,"Films Made" : ["RRR", "Eega", "Baahubali"] });
- SourceLocation getSourceLoc();
-}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp
similarity index 69%
copy from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp
index 64f8d6d751..3da960b3c5 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp
@@ -17,14 +17,19 @@
* under the License.
*/
-package org.apache.asterix.metadata.declared;
+USE test;
+
+COPY (
+ select c.* from TestCollection c
+) toWriter
+TO %adapter%
+PATH (%pathprefix% "copy-to-result", "parquet-field-names1")
+TYPE ( { id:int, `Director=name` : string, `Director.Age` : int ,`Films Made`
: [string] } )
+WITH {
+ %template_colons%,
+ %additionalProperties%
+ "format":"parquet"
+};
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-public interface IExternalWriteDataSink extends IWriteDataSink {
- ARecordType getItemType();
- SourceLocation getSourceLoc();
-}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp
similarity index 69%
copy from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp
index 64f8d6d751..38acb88723 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp
@@ -17,14 +17,20 @@
* under the License.
*/
-package org.apache.asterix.metadata.declared;
+USE test;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-public interface IExternalWriteDataSink extends IWriteDataSink {
- ARecordType getItemType();
+CREATE TYPE ColumnType2 AS {
+};
- SourceLocation getSourceLoc();
-}
+
+
+CREATE EXTERNAL DATASET TestDataset1(ColumnType2) USING %adapter%
+(
+ %template%,
+ %additional_Properties%,
+ ("definition"="%path_prefix%copy-to-result/parquet-field-names1/"),
+ ("include"="*.parquet"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp
similarity index 69%
copy from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp
index 64f8d6d751..86d344ca4f 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp
@@ -17,14 +17,11 @@
* under the License.
*/
-package org.apache.asterix.metadata.declared;
+USE test;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-public interface IExternalWriteDataSink extends IWriteDataSink {
- ARecordType getItemType();
+SELECT c.*
+FROM TestDataset1 c
+ORDER BY c.id;
+
- SourceLocation getSourceLoc();
-}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp
similarity index 69%
copy from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp
index 64f8d6d751..f72a1f5488 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp
@@ -17,14 +17,18 @@
* under the License.
*/
-package org.apache.asterix.metadata.declared;
+USE test;
+
+COPY (
+select c.* from TestCollection c
+ ) toWriter
+TO %adapter%
+PATH (%pathprefix% "copy-to-result", "parquet-field-names2")
+WITH {
+ %template_colons%,
+ %additionalProperties%
+ "format":"parquet"
+ };
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-public interface IExternalWriteDataSink extends IWriteDataSink {
- ARecordType getItemType();
- SourceLocation getSourceLoc();
-}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp
similarity index 69%
copy from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp
index 64f8d6d751..17003c5b36 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp
@@ -17,14 +17,15 @@
* under the License.
*/
-package org.apache.asterix.metadata.declared;
+USE test;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-public interface IExternalWriteDataSink extends IWriteDataSink {
- ARecordType getItemType();
-
- SourceLocation getSourceLoc();
-}
+CREATE EXTERNAL DATASET TestDataset2(ColumnType2) USING %adapter%
+(
+ %template%,
+ %additional_Properties%,
+ ("definition"="%path_prefix%copy-to-result/parquet-field-names2/"),
+ ("include"="*.parquet"),
+ ("requireVersionChangeDetection"="false"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp
similarity index 69%
copy from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp
index 64f8d6d751..ce09a4a5ef 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp
@@ -17,14 +17,11 @@
* under the License.
*/
-package org.apache.asterix.metadata.declared;
+USE test;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-public interface IExternalWriteDataSink extends IWriteDataSink {
- ARecordType getItemType();
+SELECT c.*
+FROM TestDataset2 c
+ORDER BY c.id;
+
- SourceLocation getSourceLoc();
-}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm
new file mode 100644
index 0000000000..4566d53862
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm
@@ -0,0 +1 @@
+{ "id": 18, "Director=name": "SS Rajamouli", "Director.Age": 51, "Films Made":
[ "RRR", "Eega", "Baahubali" ] }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm
new file mode 100644
index 0000000000..fa88a74d78
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm
@@ -0,0 +1 @@
+{ "Director.Age": 51, "Films Made": [ "RRR", "Eega", "Baahubali" ],
"Director=name": "SS Rajamouli", "id": 18 }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 1a05334bbc..04e8d1e891 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -114,6 +114,16 @@
<output-dir compare="Text">parquet-cover-data-types</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="copy-to">
+ <compilation-unit name="parquet-field-names">
+ <placeholder name="adapter" value="S3" />
+ <placeholder name="pathprefix" value="" />
+ <placeholder name="path_prefix" value="" />
+ <placeholder name="additionalProperties"
value='"container":"playground",' />
+ <placeholder name="additional_Properties"
value='("container"="playground")' />
+ <output-dir compare="Text">parquet-field-names</output-dir>
+ </compilation-unit>
+ </test-case>
<test-case FilePath="copy-to">
<compilation-unit name="parquet-empty-array">
<placeholder name="adapter" value="S3" />
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
index d754068f00..5d9ab7f04a 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
@@ -53,7 +53,7 @@ public class ParquetExternalWriterFactory implements
Serializable {
public IExternalWriter createWriter(ParquetSchemaTree.SchemaNode
schemaNode) throws HyracksDataException {
MessageType schema = generateSchema(schemaNode);
- printerFactory.setParquetSchemaString(schema.toString());
+ printerFactory.setParquetSchema(schema);
IExternalFileWriter writer = writerFactory.createWriter(ctx,
printerFactory);
return new ExternalFileWriter(resolver, writer, maxResult);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
index b500cbe86a..25fbdc836c 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+// Maintains a pool of Parquet writers holding a file, each with its own
schema , and writes values to the appropriate writer based on schema.
public class ParquetSchemaInferPoolWriter {
private final ParquetExternalWriterFactory writerFactory;
@@ -57,6 +58,7 @@ public class ParquetSchemaInferPoolWriter {
if
(schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.EQUIVALENT)) {
return;
} else if
(schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.GROWING)) {
+ // If the schema is growing, close the existing writer and
create a new one with the new schema.
schemaNodes.set(i, schemaLazyVisitor.inferSchema(value));
closeWriter(i);
return;
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
index 3dbd4d36a5..7c1c03bdda 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java
@@ -76,6 +76,7 @@ public class ParquetSinkExternalWriterRuntime extends
AbstractOneInputSinkPushRu
}
+ // Schema Inference is done frame wise, i.e., we infer the schema for all
the records in frame and write the values with schema inferred until now.
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tupleAccessor.reset(buffer);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
index ba7a1ee71b..046c03f707 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
@@ -34,23 +34,22 @@ import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
public class ParquetExternalFilePrinter implements IExternalPrinter {
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
- private MessageType schema;
+ private final MessageType schema;
private ParquetOutputFile parquetOutputFile;
- private String parquetSchemaString;
+ // private String parquetSchemaString;
private ParquetWriter<IValueReference> writer;
private final long rowGroupSize;
private final int pageSize;
private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinter(CompressionCodecName
compressionCodecName, String parquetSchemaString,
+ public ParquetExternalFilePrinter(CompressionCodecName
compressionCodecName, MessageType parquetSchemaString,
IAType typeInfo, long rowGroupSize, int pageSize,
ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.parquetSchemaString = parquetSchemaString;
+ this.schema = parquetSchemaString;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
@@ -59,7 +58,6 @@ public class ParquetExternalFilePrinter implements
IExternalPrinter {
@Override
public void open() throws HyracksDataException {
- schema = MessageTypeParser.parseMessageType(parquetSchemaString);
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
index b6ad34e773..035e49ac7c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
@@ -18,25 +18,34 @@
*/
package org.apache.asterix.external.writer.printer;
+import org.apache.asterix.common.exceptions.CompilationException;
+import
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
public class ParquetExternalFilePrinterFactory implements
IExternalPrinterFactory {
private static final long serialVersionUID = 8971234908711235L;
- private String parquetSchemaString;
+ // parquetInferSchema is for the case when the schema is inferred from the
data, not provided by the user
+ // set During the runtime
+ private transient MessageType parquetInferSchema;
+ // parquetProvidedSchema is for the case when the schema is provided by
the user
+ private ARecordType parquetProvidedSchema;
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
private final long rowGroupSize;
private final int pageSize;
private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinterFactory(CompressionCodecName
compressionCodecName, String parquetSchemaString,
- IAType typeInfo, long rowGroupSize, int pageSize,
ParquetProperties.WriterVersion writerVersion) {
+ public ParquetExternalFilePrinterFactory(CompressionCodecName
compressionCodecName,
+ ARecordType parquetprovidedSchema, IAType typeInfo, long
rowGroupSize, int pageSize,
+ ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.parquetSchemaString = parquetSchemaString;
+ this.parquetProvidedSchema = parquetprovidedSchema;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
@@ -52,13 +61,25 @@ public class ParquetExternalFilePrinterFactory implements
IExternalPrinterFactor
this.writerVersion = writerVersion;
}
- public void setParquetSchemaString(String parquetSchemaString) {
- this.parquetSchemaString = parquetSchemaString;
+ public void setParquetSchema(MessageType parquetInferSchema) {
+ this.parquetInferSchema = parquetInferSchema;
}
@Override
public IExternalPrinter createPrinter() {
- return new ParquetExternalFilePrinter(compressionCodecName,
parquetSchemaString, typeInfo, rowGroupSize,
- pageSize, writerVersion);
+ if (parquetInferSchema != null) {
+ return new ParquetExternalFilePrinter(compressionCodecName,
parquetInferSchema, typeInfo, rowGroupSize,
+ pageSize, writerVersion);
+ }
+
+ MessageType schema;
+ try {
+ schema =
SchemaConverterVisitor.convertToParquetSchema(parquetProvidedSchema);
+ } catch (CompilationException e) {
+ // This should not happen, Compilation Exception should be caught
at the query-compile time
+ throw new RuntimeException(e);
+ }
+ return new ParquetExternalFilePrinter(compressionCodecName, schema,
typeInfo, rowGroupSize, pageSize,
+ writerVersion);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java
index 7058bf6287..cdf24c6de0 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java
@@ -26,6 +26,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.util.string.UTF8StringUtil;
+// The Field Names Dictionary will cache the mapping between field name
bytes and their corresponding string representations,
+// minimizing the creation of new string objects during field name
deserialization while writing to parquet files.
public class FieldNamesDictionary {
private final FieldNamesTrieDictionary trie;
private final List<String> fieldNames;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
index 99b97369df..dfa6e4f3de 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java
@@ -22,6 +22,10 @@ import
org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
public interface ISchemaChecker {
+
+ // EQUIVALENT: Example: { name: string, age: int } -> { name: string, age:
int }
+ // GROWING: equivalent types but having extra fields, Example: { name:
string, age: int } -> { name: string, age: int , address: string }
+ // CONFLICTING: conflict in types, Example: { name: string, age: int } ->
{ name: {first:string, last:string}, age: int }
enum SchemaComparisonType {
EQUIVALENT,
GROWING,
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
index 373bfe4626..2a03bfd024 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java
@@ -44,6 +44,8 @@ public class ParquetRecordLazyVisitor implements
ILazyVisitablePointableVisitor<
private final MessageType schema;
private final RecordLazyVisitablePointable rec;
+ // The Record Consumer is responsible for traversing the record tree,
+ // using recordConsumer.startField() to navigate into a child node and
endField() to move back to the parent node.
private RecordConsumer recordConsumer;
private final FieldNamesDictionary fieldNamesDictionary;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
index b59117571b..70872bb2ec 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
+// This class is used to infer the schema of a record into SchemaNode, which
is an internal tree representation of the schema.
public class ParquetSchemaLazyVisitor implements
ILazyVisitablePointableVisitor<Void, ParquetSchemaTree.SchemaNode> {
private final RecordLazyVisitablePointable rec;
private final FieldNamesDictionary fieldNamesDictionary;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
index 04e11f7e21..38d12f910a 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
@@ -44,6 +44,7 @@ import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.PrimitiveType;
+//This class reduces the number of Java objects created each time a column is
written to a Parquet file by reusing the same VoidPointable for all columns
within the file.
public class ParquetValueWriter {
public static final String LIST_FIELD = "list";
public static final String ELEMENT_FIELD = "element";
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
index 44cd5b256d..fc43c89afc 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java
@@ -30,6 +30,8 @@ import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+// This class is used to check the schema of a record against a schema that
has been inferred so far.
+// By checking, we can determine if the record is equivalent to the schema, if
the record is growing, or if there is a conflict.
public class SchemaCheckerLazyVisitor implements ISchemaChecker,
ILazyVisitablePointableVisitor<ISchemaChecker.SchemaComparisonType,
ParquetSchemaTree.SchemaNode> {
private final FieldNamesDictionary fieldNamesDictionary;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
index a6ea1159a6..9f5d02f016 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
@@ -36,6 +36,7 @@ import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
+// Traverses the RecordType tree and converts it to a Parquet schema.
public class SchemaConverterVisitor implements IATypeVisitor<Void,
Pair<Types.Builder, String>> {
public static String MESSAGE_NAME = "asterix_schema";
private final ARecordType schemaType;
@@ -46,9 +47,9 @@ public class SchemaConverterVisitor implements
IATypeVisitor<Void, Pair<Types.Bu
this.unsupportedType = null;
}
- public static String convertToParquetSchemaString(ARecordType schemaType)
throws CompilationException {
+ public static MessageType convertToParquetSchema(ARecordType schemaType)
throws CompilationException {
SchemaConverterVisitor schemaConverterVisitor = new
SchemaConverterVisitor(schemaType);
- return schemaConverterVisitor.getParquetSchema().toString();
+ return schemaConverterVisitor.getParquetSchema();
}
private MessageType getParquetSchema() throws CompilationException {
diff --git
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
index e1c978a97c..126f3ba365 100644
---
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
+++
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
@@ -26,6 +26,7 @@ public class ExternalDetailsDecl implements
IDatasetDetailsDecl {
private Map<String, String> properties;
private String adapter;
private ARecordType itemType;
+ private ARecordType parquetSchema;
public void setAdapter(String adapter) {
this.adapter = adapter;
@@ -43,6 +44,14 @@ public class ExternalDetailsDecl implements
IDatasetDetailsDecl {
return itemType;
}
+ public void setParquetSchema(ARecordType parquetSchema) {
+ this.parquetSchema = parquetSchema;
+ }
+
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
public String getAdapter() {
return adapter;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
index 64f8d6d751..1168ba1de7 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
@@ -26,5 +26,7 @@ import org.apache.hyracks.api.exceptions.SourceLocation;
public interface IExternalWriteDataSink extends IWriteDataSink {
ARecordType getItemType();
+ ARecordType getParquetSchema();
+
SourceLocation getSourceLoc();
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
index d1667bf116..4a10f7fcaf 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
@@ -28,14 +28,16 @@ import org.apache.hyracks.api.exceptions.SourceLocation;
public class WriteDataSink implements IExternalWriteDataSink {
private final String adapterName;
private final Map<String, String> configuration;
- private ARecordType itemType;
- private SourceLocation sourceLoc;
+ private final ARecordType itemType;
+ private final ARecordType parquetSchema;
+ private final SourceLocation sourceLoc;
public WriteDataSink(String adapterName, Map<String, String>
configuration, ARecordType itemType,
- SourceLocation sourceLoc) {
+ ARecordType parquetSchema, SourceLocation sourceLoc) {
this.adapterName = adapterName;
this.configuration = configuration;
this.itemType = itemType;
+ this.parquetSchema = parquetSchema;
this.sourceLoc = sourceLoc;
}
@@ -43,6 +45,7 @@ public class WriteDataSink implements IExternalWriteDataSink {
this.adapterName = writeDataSink.getAdapterName();
this.configuration = new HashMap<>(writeDataSink.configuration);
this.itemType = writeDataSink.itemType;
+ this.parquetSchema = writeDataSink.parquetSchema;
this.sourceLoc = writeDataSink.sourceLoc;
}
@@ -51,6 +54,11 @@ public class WriteDataSink implements IExternalWriteDataSink
{
return itemType;
}
+ @Override
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
@Override
public SourceLocation getSourceLoc() {
return sourceLoc;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index e6716dff29..e88b1deb1e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -205,7 +205,7 @@ public class ExternalWriterProvider {
case ExternalDataConstants.FORMAT_PARQUET:
CompressionCodecName compressionCodecName;
- if (compression == null || compression.equals("") ||
compression.equals("none")) {
+ if (compression == null || compression.isEmpty() ||
compression.equals("none")) {
compressionCodecName = CompressionCodecName.UNCOMPRESSED;
} else {
compressionCodecName =
CompressionCodecName.valueOf(compression.toUpperCase());
@@ -218,10 +218,11 @@ public class ExternalWriterProvider {
int pageSize = (int) StorageUtil.getByteValue(pageSizeString);
ParquetProperties.WriterVersion writerVersion =
getParquetWriterVersion(configuration);
- if
(configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY) != null) {
- String parquetSchemaString =
configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
+ ARecordType parquetSchema = ((IExternalWriteDataSink)
sink).getParquetSchema();
+
+ if (parquetSchema != null) {
ParquetExternalFilePrinterFactory parquetPrinterFactory =
- new
ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
+ new
ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchema,
(IAType) sourceType, rowGroupSize,
pageSize, writerVersion);
ExternalFileWriterFactory parquetWriterFactory = new
ExternalFileWriterFactory(fileWriterFactory,
@@ -230,6 +231,7 @@ public class ExternalWriterProvider {
partitionComparatorFactories, inputDesc,
parquetWriterFactory);
}
+ // Parquet Writing with Schema Inference
int maxSchemas =
ExternalWriterProvider.getMaxParquetSchema(configuration);
ParquetExternalFilePrinterFactoryProvider
printerFactoryProvider =
new
ParquetExternalFilePrinterFactoryProvider(compressionCodecName, (IAType)
sourceType,