This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new d4867eb900 [ASTERIXDB-3392] Change schema format to asterix
d4867eb900 is described below
commit d4867eb9008224c84ddc04d54d3076a807c66047
Author: preetham0202 <[email protected]>
AuthorDate: Wed May 15 12:03:02 2024 +0530
[ASTERIXDB-3392] Change schema format to asterix
Change-Id: I631b396da1e1c14deea7e2574a395ef5530b1564
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18111
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
---
.../asterix/app/translator/QueryTranslator.java | 16 +
.../parquet-error-checks.02.update.sqlpp | 3 +-
.../parquet-error-checks.03.update.sqlpp | 21 +-
.../parquet-error-checks.04.update.sqlpp | 18 +-
.../parquet-error-checks.05.update.sqlpp | 12 +-
.../parquet-error-checks.06.update.sqlpp | 22 +-
.../parquet-error-checks.07.update.sqlpp | 2 +-
.../parquet-error-checks.08.update.sqlpp | 2 +-
.../parquet-error-checks.09.update.sqlpp | 6 +-
....sqlpp => parquet-error-checks.10.update.sqlpp} | 3 +-
....sqlpp => parquet-error-checks.11.update.sqlpp} | 8 +-
....sqlpp => parquet-error-checks.12.update.sqlpp} | 8 +-
....sqlpp => parquet-error-checks.13.update.sqlpp} | 10 +-
....sqlpp => parquet-error-checks.14.update.sqlpp} | 10 +-
....sqlpp => parquet-error-checks.15.update.sqlpp} | 12 +-
.../parquet-cover-data-types.03.update.sqlpp | 12 +-
.../parquet-simple/parquet-simple.02.update.sqlpp | 6 +-
.../parquet-tweet/parquet-tweet.03.update.sqlpp | 386 ++++++++++-----------
.../parquet-utf8/parquet-utf8.03.update.sqlpp | 8 +-
.../runtimets/testsuite_external_dataset_s3.xml | 16 +-
.../asterix/common/exceptions/ErrorCode.java | 3 +
.../src/main/resources/asx_errormsg/en.properties | 3 +
.../external/util/ExternalDataConstants.java | 7 +
.../external/util/WriterValidationUtil.java | 20 +-
.../writer/printer/ParquetExternalFilePrinter.java | 24 +-
.../printer/ParquetExternalFilePrinterFactory.java | 14 +-
.../printer/parquet/AsterixParquetTypeMap.java | 50 +++
.../printer/parquet/SchemaConverterVisitor.java | 151 ++++++++
.../lang/common/statement/CopyToStatement.java | 29 +-
.../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj | 5 +-
.../metadata/provider/ExternalWriterProvider.java | 26 +-
.../apache/hyracks/api/exceptions/ErrorCode.java | 5 +-
.../src/main/resources/errormsg/en.properties | 5 +-
33 files changed, 595 insertions(+), 328 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 26020e53a1..f30c80084f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -109,6 +109,7 @@ import
org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.WriterValidationUtil;
+import
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
import org.apache.asterix.lang.common.base.IQueryRewriter;
import org.apache.asterix.lang.common.base.IReturningStatement;
import org.apache.asterix.lang.common.base.IRewriterFactory;
@@ -4093,6 +4094,21 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS,
copyTo.getSourceLocation(), mdTxnCtx,
metadataProvider));
+ if (edd.getProperties().get(ExternalDataConstants.KEY_FORMAT)
+
.equalsIgnoreCase(ExternalDataConstants.FORMAT_PARQUET)) {
+ if (copyTo.getType() == null) {
+ throw new
CompilationException(ErrorCode.COMPILATION_ERROR,
+ "TYPE() Expression is required for parquet
format");
+ }
+
+ DataverseName dataverseName =
+
DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
+ IAType iaType =
translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
+ ExternalDataConstants.DUMMY_TYPE_NAME,
copyTo.getType(), mdTxnCtx);
+
edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
+
SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
+ }
+
Map<VarIdentifier, IAObject> externalVars =
createExternalVariables(copyTo, stmtParams);
// Query Rewriting (happens under the same ongoing metadata
transaction)
LangRewritingContext langRewritingContext =
createLangRewritingContext(metadataProvider,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
index 23d9316934..b6ec75803f 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
@@ -30,8 +30,7 @@ WITH {
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema{"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
index 2e811f28ce..5fb0a9b0f3 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.03.update.sqlpp
@@ -27,22 +27,23 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks3")
+TYPE ( {
+ id : string,
+ name : string,
+ nested :
+ {
+ first : string,
+ second : string
+ }
+ }
+)
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional binary id (UTF8);
- optional binary name (UTF8);
- optional group nested {
- optional binary first (UTF8);
- optional binary second (UTF8);
- }
- }
-"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.04.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.04.update.sqlpp
index 9933fe29dd..8b1efc846d 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.04.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.04.update.sqlpp
@@ -26,21 +26,21 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks4")
+TYPE ( {
+ id : bigint,
+ name : string,
+ nested:
+ {
+ first : string
+ }
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional int64 id;
- optional binary name (UTF8);
- optional group nested {
- optional binary first (UTF8);
- }
- }
-"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.05.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.05.update.sqlpp
index cf3a450488..79b2d1aa08 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.05.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.05.update.sqlpp
@@ -25,18 +25,18 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks5")
+TYPE ( {
+ id : bigint,
+ name : string,
+ nested : string
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional int64 id;
- optional binary name (UTF8);
- optional binary nested (UTF8);
- }"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.06.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.06.update.sqlpp
index 2d53754e42..3e6ac480aa 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.06.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.06.update.sqlpp
@@ -25,23 +25,23 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks6")
+TYPE ( {
+ id : bigint,
+ name : {
+ first : string
+ },
+ nested:{
+ first : string,
+ second : string
+ }
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message spark_schema {
- optional int64 id;
- optional group name {
- optional binary first (UTF8);
- }
- optional group nested {
- optional binary first (UTF8);
- optional binary second (UTF8);
- }
- }"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
index dcb0dcdcae..851559a6a3 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
@@ -25,6 +25,7 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks7")
+TYPE ( {id:int} )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,7 +33,6 @@ WITH {
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "schema":"message schema{}",
"row-group-size":"random"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.08.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.08.update.sqlpp
index 9856c4ab76..2f356fb9d7 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.08.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.08.update.sqlpp
@@ -25,6 +25,7 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks8")
+TYPE ( {id:int} )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,6 +33,5 @@ WITH {
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "schema":"message schema{}",
"page-size":"random"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.09.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.09.update.sqlpp
index b8579a59ca..f2293e369c 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.09.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.09.update.sqlpp
@@ -25,6 +25,7 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-error-checks9")
+TYPE ( { name:string } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,6 +33,5 @@ WITH {
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "compression":"rar",
- "schema":""
-}
+ "compression":"rar"
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.10.update.sqlpp
similarity index 94%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.10.update.sqlpp
index 23d9316934..4f52164a6f 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.10.update.sqlpp
@@ -23,7 +23,8 @@ COPY (
select c.* from TestCollection c
) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-error-checks10")
+TYPE ( { name: } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.11.update.sqlpp
similarity index 88%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.11.update.sqlpp
index 23d9316934..f22071a36c 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.11.update.sqlpp
@@ -23,15 +23,17 @@ COPY (
select c.* from TestCollection c
) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-error-checks11")
+TYPE ( {
+ id : int , name : binary
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema{"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.12.update.sqlpp
similarity index 88%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.12.update.sqlpp
index 23d9316934..d5d11ebc65 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.12.update.sqlpp
@@ -23,7 +23,11 @@ COPY (
select c.* from TestCollection c
) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-error-checks12")
+TYPE ( {
+ id : int,
+ name : string
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -31,7 +35,7 @@ WITH {
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "schema":"message schema{"
+ "version" : 3
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
similarity index 85%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
index 23d9316934..75245f194e 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.13.update.sqlpp
@@ -23,15 +23,19 @@ COPY (
select c.* from TestCollection c
) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-error-checks13")
+TYPE ( {
+ id : int,
+ name : string,
+ list : [int,string]
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema{"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
similarity index 85%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
index 23d9316934..0becb36b9e 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.14.update.sqlpp
@@ -23,15 +23,19 @@ COPY (
select c.* from TestCollection c
) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks2")
+PATH ("copy-to-result", "parquet-error-checks14")
+TYPE ( {
+ id : int,
+ name : string,
+ list : [int |
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema{"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
similarity index 86%
copy from
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
index dcb0dcdcae..cc67f79377 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.07.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/negative/parquet-error-checks/parquet-error-checks.15.update.sqlpp
@@ -19,21 +19,23 @@
USE test;
-
COPY (
select c.* from TestCollection c
) toWriter
TO S3
-PATH ("copy-to-result", "parquet-error-checks7")
+PATH ("copy-to-result", "parquet-error-checks15")
+TYPE ( {
+ id : int,
+ name : string,
+ list : [int] )
+
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema{}",
- "row-group-size":"random"
+ "format":"parquet"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
index c2606b7797..0012e221be 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
@@ -24,22 +24,14 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-cover-data-types")
+TYPE ( { name : string, id : int, dateType : date, timeType : time,
boolType : boolean, doubleType : double, datetimeType : datetime } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional binary name (UTF8);
- optional int32 id;
- optional int32 dateType (DATE);
- optional int32 timeType (TIME_MILLIS);
- optional boolean boolType ;
- optional double doubleType ;
- optional int64 datetimeType (TIMESTAMP_MILLIS);
- }"
+ "format":"parquet"
};
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-simple/parquet-simple.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-simple/parquet-simple.02.update.sqlpp
index 745ac872b1..4bec537334 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-simple/parquet-simple.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-simple/parquet-simple.02.update.sqlpp
@@ -25,6 +25,7 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-simple")
+TYPE ( {id:string} )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
@@ -32,8 +33,5 @@ WITH {
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"parquet",
- "schema":"message schema {
- optional binary id (UTF8);
- }"
-
+ "version" : "2"
};
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-tweet/parquet-tweet.03.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-tweet/parquet-tweet.03.update.sqlpp
index 179d25cc6d..aed4e09da4 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-tweet/parquet-tweet.03.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-tweet/parquet-tweet.03.update.sqlpp
@@ -24,216 +24,194 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-tweet")
+TYPE ( {
+ coordinates: {
+ coordinates: [
+ double
+ ],
+ `type` : string
+ },
+ created_at: string,
+ entities: {
+ urls: [
+ {
+ display_url: string,
+ expanded_url: string,
+ indices: [
+ int
+ ],
+ url: string
+ }
+ ],
+ user_mentions: [
+ {
+ id: int,
+ id_str: string,
+ indices: [
+ int
+ ],
+ name: string,
+ screen_name: string
+ }
+ ]
+ },
+ favorite_count: int,
+ favorited: boolean,
+ filter_level: string,
+ geo: {
+ coordinates: [
+ double
+ ],
+ `type`: string
+ },
+ id: string,
+ id_str: string,
+ in_reply_to_screen_name: string,
+ in_reply_to_status_id: int,
+ in_reply_to_status_id_str: string,
+ in_reply_to_user_id: int,
+ in_reply_to_user_id_str: string,
+ is_quote_status: boolean,
+ lang: string,
+ place: {
+ bounding_box: {
+ coordinates: [
+ [
+ [
+ double
+ ]
+ ]
+ ],
+ `type`: string
+ },
+ country: string,
+ country_code: string,
+ full_name: string,
+ id: string,
+ name: string,
+ place_type: string,
+ url: string
+ },
+ possibly_sensitive: boolean,
+ quoted_status: {
+ created_at: string,
+ entities: {
+ user_mentions: [
+ {
+ id: int,
+ id_str: string,
+ indices: [
+ int
+ ],
+ name: string,
+ screen_name: string
+ }
+ ]
+ },
+ favorite_count: int,
+ favorited: boolean,
+ filter_level: string,
+ id: int,
+ id_str: string,
+ in_reply_to_screen_name: string,
+ in_reply_to_status_id: int,
+ in_reply_to_status_id_str: string,
+ in_reply_to_user_id: int,
+ in_reply_to_user_id_str: string,
+ is_quote_status: boolean,
+ lang: string,
+ retweet_count: int,
+ retweeted: boolean,
+ source: string,
+ text: string,
+ truncated: boolean,
+ user: {
+ contributors_enabled: boolean,
+ created_at: string,
+ default_profile: boolean,
+ default_profile_image: boolean,
+ description: string,
+ favourites_count: int,
+ followers_count: int,
+ friends_count: int,
+ geo_enabled: boolean,
+ id: int,
+ id_str: string,
+ is_translator: boolean,
+ lang: string,
+ listed_count: int,
+ name: string,
+ profile_background_color: string,
+ profile_background_image_url: string,
+ profile_background_image_url_https: string,
+ profile_background_tile: boolean,
+ profile_banner_url: string,
+ profile_image_url: string,
+ profile_image_url_https: string,
+ profile_link_color: string,
+ profile_sidebar_border_color: string,
+ profile_sidebar_fill_color: string,
+ profile_text_color: string,
+ profile_use_background_image: boolean,
+ protected: boolean,
+ screen_name: string,
+ statuses_count: int,
+ verified: boolean
+ }
+ },
+ quoted_status_id: int,
+ quoted_status_id_str: string,
+ retweet_count: int,
+ retweeted: boolean,
+ source: string,
+ text: string,
+ timestamp_ms: string,
+ truncated: boolean,
+ user: {
+ contributors_enabled: boolean,
+ created_at: string,
+ default_profile: boolean,
+ default_profile_image: boolean,
+ description: string,
+ favourites_count: int,
+ followers_count: int,
+ friends_count: int,
+ geo_enabled: boolean,
+ id: int,
+ id_str: string,
+ is_translator: boolean,
+ lang: string,
+ listed_count: int,
+ location: string,
+ name: string,
+ profile_background_color: string,
+ profile_background_image_url: string,
+ profile_background_image_url_https: string,
+ profile_background_tile: boolean,
+ profile_banner_url: string,
+ profile_image_url: string,
+ profile_image_url_https: string,
+ profile_link_color: string,
+ profile_sidebar_border_color: string,
+ profile_sidebar_fill_color: string,
+ profile_text_color: string,
+ profile_use_background_image: boolean,
+ protected: boolean,
+ screen_name: string,
+ statuses_count: int,
+ time_zone: string,
+ url: string,
+ utc_offset: int,
+ verified: boolean
+ }
+ } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message schema {
- optional group coordinates {
- optional group coordinates (LIST) {
- repeated group list {
- optional double element;
- }
- }
- optional binary type (UTF8);
- }
- optional binary created_at (UTF8);
- optional group entities {
- optional group urls (LIST) {
- repeated group list {
- optional group element {
- optional binary display_url (UTF8);
- optional binary expanded_url (UTF8);
- optional group indices (LIST) {
- repeated group list {
- optional int64 element;
- }
- }
- optional binary url (UTF8);
- }
- }
- }
- optional group user_mentions (LIST) {
- repeated group list {
- optional group element {
- optional int64 id;
- optional binary id_str (UTF8);
- optional group indices (LIST) {
- repeated group list {
- optional int64 element;
- }
- }
- optional binary name (UTF8);
- optional binary screen_name (UTF8);
- }
- }
- }
- }
- optional int64 favorite_count;
- optional boolean favorited;
- optional binary filter_level (UTF8);
- optional group geo {
- optional group coordinates (LIST) {
- repeated group list {
- optional double element;
- }
- }
- optional binary type (UTF8);
- }
- optional binary id (UTF8);
- optional binary id_str (UTF8);
- optional binary in_reply_to_screen_name (UTF8);
- optional int64 in_reply_to_status_id;
- optional binary in_reply_to_status_id_str (UTF8);
- optional int64 in_reply_to_user_id;
- optional binary in_reply_to_user_id_str (UTF8);
- optional boolean is_quote_status;
- optional binary lang (UTF8);
- optional group place {
- optional group bounding_box {
- optional group coordinates (LIST) {
- repeated group list {
- optional group element (LIST) {
- repeated group list {
- optional group element (LIST) {
- repeated group list {
- optional double element;
- }
- }
- }
- }
- }
- }
- optional binary type (UTF8);
- }
- optional binary country (UTF8);
- optional binary country_code (UTF8);
- optional binary full_name (UTF8);
- optional binary id (UTF8);
- optional binary name (UTF8);
- optional binary place_type (UTF8);
- optional binary url (UTF8);
- }
- optional boolean possibly_sensitive;
- optional group quoted_status {
- optional binary created_at (UTF8);
- optional group entities {
- optional group user_mentions (LIST) {
- repeated group list {
- optional group element {
- optional int64 id;
- optional binary id_str (UTF8);
- optional group indices (LIST) {
- repeated group list {
- optional int64 element;
- }
- }
- optional binary name (UTF8);
- optional binary screen_name (UTF8);
- }
- }
- }
- }
- optional int64 favorite_count;
- optional boolean favorited;
- optional binary filter_level (UTF8);
- optional int64 id;
- optional binary id_str (UTF8);
- optional binary in_reply_to_screen_name (UTF8);
- optional int64 in_reply_to_status_id;
- optional binary in_reply_to_status_id_str (UTF8);
- optional int64 in_reply_to_user_id;
- optional binary in_reply_to_user_id_str (UTF8);
- optional boolean is_quote_status;
- optional binary lang (UTF8);
- optional int64 retweet_count;
- optional boolean retweeted;
- optional binary source (UTF8);
- optional binary text (UTF8);
- optional boolean truncated;
- optional group user {
- optional boolean contributors_enabled;
- optional binary created_at (UTF8);
- optional boolean default_profile;
- optional boolean default_profile_image;
- optional binary description (UTF8);
- optional int64 favourites_count;
- optional int64 followers_count;
- optional int64 friends_count;
- optional boolean geo_enabled;
- optional int64 id;
- optional binary id_str (UTF8);
- optional boolean is_translator;
- optional binary lang (UTF8);
- optional int64 listed_count;
- optional binary name (UTF8);
- optional binary profile_background_color (UTF8);
- optional binary profile_background_image_url (UTF8);
- optional binary profile_background_image_url_https (UTF8);
- optional boolean profile_background_tile;
- optional binary profile_banner_url (UTF8);
- optional binary profile_image_url (UTF8);
- optional binary profile_image_url_https (UTF8);
- optional binary profile_link_color (UTF8);
- optional binary profile_sidebar_border_color (UTF8);
- optional binary profile_sidebar_fill_color (UTF8);
- optional binary profile_text_color (UTF8);
- optional boolean profile_use_background_image;
- optional boolean protected;
- optional binary screen_name (UTF8);
- optional int64 statuses_count;
- optional boolean verified;
- }
- }
- optional int64 quoted_status_id;
- optional binary quoted_status_id_str (UTF8);
- optional int64 retweet_count;
- optional boolean retweeted;
- optional binary source (UTF8);
- optional binary text (UTF8);
- optional binary timestamp_ms (UTF8);
- optional boolean truncated;
- optional group user {
- optional boolean contributors_enabled;
- optional binary created_at (UTF8);
- optional boolean default_profile;
- optional boolean default_profile_image;
- optional binary description (UTF8);
- optional int64 favourites_count;
- optional int64 followers_count;
- optional int64 friends_count;
- optional boolean geo_enabled;
- optional int64 id;
- optional binary id_str (UTF8);
- optional boolean is_translator;
- optional binary lang (UTF8);
- optional int64 listed_count;
- optional binary location (UTF8);
- optional binary name (UTF8);
- optional binary profile_background_color (UTF8);
- optional binary profile_background_image_url (UTF8);
- optional binary profile_background_image_url_https (UTF8);
- optional boolean profile_background_tile;
- optional binary profile_banner_url (UTF8);
- optional binary profile_image_url (UTF8);
- optional binary profile_image_url_https (UTF8);
- optional binary profile_link_color (UTF8);
- optional binary profile_sidebar_border_color (UTF8);
- optional binary profile_sidebar_fill_color (UTF8);
- optional binary profile_text_color (UTF8);
- optional boolean profile_use_background_image;
- optional boolean protected;
- optional binary screen_name (UTF8);
- optional int64 statuses_count;
- optional binary time_zone (UTF8);
- optional binary url (UTF8);
- optional int64 utc_offset;
- optional boolean verified;
- }
- }"
+ "format":"parquet"
};
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-utf8/parquet-utf8.03.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-utf8/parquet-utf8.03.update.sqlpp
index ef76ad1f77..9a1c9a473b 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-utf8/parquet-utf8.03.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-utf8/parquet-utf8.03.update.sqlpp
@@ -24,18 +24,14 @@ COPY (
) toWriter
TO S3
PATH ("copy-to-result", "parquet-utf8")
+TYPE ( { comment:string, id:bigint, name:string } )
WITH {
"accessKeyId":"dummyAccessKey",
"secretAccessKey":"dummySecretKey",
"region":"us-west-2",
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
- "format":"parquet",
- "schema":"message spark_schema {
- optional binary comment (UTF8);
- optional int64 id;
- optional binary name (UTF8);
- }"
+ "format":"parquet"
};
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 8e31aa424c..1107ddaf8e 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -107,15 +107,21 @@
<test-case FilePath="copy-to/negative">
<compilation-unit name="parquet-error-checks">
<output-dir compare="Text">parquet-error-checks</output-dir>
- <expected-error>HYR0131: Invalid parquet schema
provided</expected-error>
+ <expected-error>ASX1079: Compilation error: TYPE() Expression is
required for parquet format</expected-error>
<expected-error>ASX0037: Type mismatch: expected value of type
integer, but got the value of type BINARY</expected-error>
- <expected-error>HYR0133: Extra field in the result, field 'second'
does not exist at 'nested' in the schema</expected-error>
- <expected-error>HYR0132: Result does not follow the schema, group type
expected but found primitive type at 'nested'</expected-error>
- <expected-error>HYR0132: Result does not follow the schema, primitive
type expected but found group type at 'name'</expected-error>
+ <expected-error>HYR0132: Extra field in the result, field 'second'
does not exist at 'nested' in the schema</expected-error>
+ <expected-error>HYR0131: Result does not follow the schema, group type
expected but found primitive type at 'nested'</expected-error>
+ <expected-error>HYR0131: Result does not follow the schema, primitive
type expected but found group type at 'name'</expected-error>
<expected-error>ASX1201: Storage units expected for the field
'row-group-size' (e.g., 0.1KB, 100kb, 1mb, 3MB, 8.5GB ...). Provided
'random'</expected-error>
<expected-error>ASX1201: Storage units expected for the field
'page-size' (e.g., 0.1KB, 100kb, 1mb, 3MB, 8.5GB ...). Provided
'random'</expected-error>
<expected-error>ASX1202: Unsupported compression scheme rar. Supported
schemes for parquet are [gzip, snappy, zstd]</expected-error>
- </compilation-unit>
+ <expected-error>ASX1001: Syntax error</expected-error>
+ <expected-error>ASX1204: 'binary' type not supported in parquet
format</expected-error>
+ <expected-error>ASX1205: Invalid Parquet Writer Version
provided.Supported values: 1,2</expected-error>
+ <expected-error>ASX1001: Syntax error</expected-error>
+ <expected-error>ASX1001: Syntax error</expected-error>
+ <expected-error>ASX1001: Syntax error</expected-error>
+ </compilation-unit>
</test-case>
<test-case FilePath="copy-to/negative">
<compilation-unit name="empty-over">
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 5e59b97870..832a7cccc0 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -306,6 +306,9 @@ public enum ErrorCode implements IError {
PREFIX_SHOULD_NOT_START_WITH_SLASH(1200),
ILLEGAL_SIZE_PROVIDED(1201),
UNSUPPORTED_WRITER_COMPRESSION_SCHEME(1202),
+ INVALID_PARQUET_SCHEMA(1203),
+ TYPE_UNSUPPORTED_PARQUET_WRITE(1204),
+ INVALID_PARQUET_WRITER_VERSION(1205),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index a07dddf2bb..716dcf6968 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -307,6 +307,9 @@
1200 = Prefix should not start with "/". Prefix: '%1$s'
1201 = Storage units expected for the field '%1$s' (e.g., 0.1KB, 100kb, 1mb,
3MB, 8.5GB ...). Provided '%2$s'
1202 = Unsupported compression scheme %1$s. Supported schemes for %2$s are %3$s
+1203 = Invalid schema provided: '%1$s'
+1204 = '%1$s' type not supported in parquet format
+1205 = Invalid Parquet Writer Version provided.Supported values: 1,2
# Feed Errors
3001 = Illegal state.
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 6a4b336100..02c20702a2 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -211,6 +211,13 @@ public class ExternalDataConstants {
public static final String FORMAT_CSV = "csv";
public static final String FORMAT_TSV = "tsv";
public static final String FORMAT_PARQUET = "parquet";
+ public static final String PARQUET_SCHEMA_KEY = "parquet-schema";
+ public static final String PARQUET_WRITER_VERSION_KEY = "version";
+ public static final String PARQUET_WRITER_VERSION_VALUE_1 = "1";
+ public static final String PARQUET_WRITER_VERSION_VALUE_2 = "2";
+ public static final String DUMMY_DATABASE_NAME = "dbname";
+ public static final String DUMMY_TYPE_NAME = "typeName";
+ public static final String DUMMY_DATAVERSE_NAME = "a.b.c";
public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
public static final Set<String> ALL_FORMATS;
public static final Set<String> TEXTUAL_FORMATS;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
index 0d82dd9724..5c3585b65d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -20,11 +20,15 @@ package org.apache.asterix.external.util;
import static
org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
import static
org.apache.asterix.common.exceptions.ErrorCode.MINIMUM_VALUE_ALLOWED_FOR_PARAM;
+import static
org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
import static
org.apache.asterix.external.util.ExternalDataConstants.FORMAT_JSON_LOWER_CASE;
import static
org.apache.asterix.external.util.ExternalDataConstants.FORMAT_PARQUET;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_PARQUET_PAGE_SIZE;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_PARQUET_ROW_GROUP_SIZE;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_WRITER_MAX_RESULT;
+import static
org.apache.asterix.external.util.ExternalDataConstants.PARQUET_WRITER_VERSION_KEY;
+import static
org.apache.asterix.external.util.ExternalDataConstants.PARQUET_WRITER_VERSION_VALUE_1;
+import static
org.apache.asterix.external.util.ExternalDataConstants.PARQUET_WRITER_VERSION_VALUE_2;
import static
org.apache.asterix.external.util.ExternalDataConstants.WRITER_MAX_RESULT_MINIMUM;
import java.util.List;
@@ -75,6 +79,18 @@ public class WriterValidationUtil {
validateParquetCompression(configuration, sourceLocation);
validateParquetRowGroupSize(configuration);
validateParquetPageSize(configuration);
+ validateVersion(configuration);
+ }
+
+ private static void validateVersion(Map<String, String> configuration)
throws CompilationException {
+ String version = configuration.get(PARQUET_WRITER_VERSION_KEY);
+ if (version == null) {
+ return;
+ }
+ if (version.equals(PARQUET_WRITER_VERSION_VALUE_1) ||
version.equals(PARQUET_WRITER_VERSION_VALUE_2)) {
+ return;
+ }
+ throw
CompilationException.create(ErrorCode.INVALID_PARQUET_WRITER_VERSION);
}
private static void validateParquetRowGroupSize(Map<String, String>
configuration) throws CompilationException {
@@ -150,7 +166,7 @@ public class WriterValidationUtil {
}
if (value == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
sourceLocation, paramKey);
+ throw new CompilationException(PARAMETERS_REQUIRED,
sourceLocation, paramKey);
}
String normalizedValue = value.toLowerCase();
@@ -185,7 +201,7 @@ public class WriterValidationUtil {
}
if (value == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
sourceLocation, paramKey);
+ throw new CompilationException(PARAMETERS_REQUIRED,
sourceLocation, paramKey);
}
String normalizedValue = value.toLowerCase();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
index ca2ad55c03..ba7a1ee71b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
@@ -19,8 +19,6 @@
package org.apache.asterix.external.writer.printer;
-import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
-
import java.io.IOException;
import java.io.OutputStream;
@@ -30,40 +28,38 @@ import
org.apache.asterix.external.writer.printer.parquet.AsterixParquetWriter;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
public class ParquetExternalFilePrinter implements IExternalPrinter {
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
- private String schemaString;
private MessageType schema;
private ParquetOutputFile parquetOutputFile;
+ private String parquetSchemaString;
private ParquetWriter<IValueReference> writer;
private final long rowGroupSize;
private final int pageSize;
+ private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinter(CompressionCodecName
compressionCodecName, String schemaString, IAType typeInfo,
- long rowGroupSize, int pageSize) {
+ public ParquetExternalFilePrinter(CompressionCodecName
compressionCodecName, String parquetSchemaString,
+ IAType typeInfo, long rowGroupSize, int pageSize,
ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.schemaString = schemaString.replace('\r', ' ');
+ this.parquetSchemaString = parquetSchemaString;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
+ this.writerVersion = writerVersion;
}
@Override
public void open() throws HyracksDataException {
- try {
- this.schema = parseMessageType(schemaString);
- } catch (IllegalArgumentException e) {
- throw new HyracksDataException(ErrorCode.ILLGEAL_PARQUET_SCHEMA);
- }
+ schema = MessageTypeParser.parseMessageType(parquetSchemaString);
}
@Override
@@ -78,8 +74,8 @@ public class ParquetExternalFilePrinter implements
IExternalPrinter {
writer =
AsterixParquetWriter.builder(parquetOutputFile).withCompressionCodec(compressionCodecName)
.withType(schema).withTypeInfo(typeInfo).withRowGroupSize(rowGroupSize).withPageSize(pageSize)
.withDictionaryPageSize(ExternalDataConstants.PARQUET_DICTIONARY_PAGE_SIZE)
- .enableDictionaryEncoding().withValidation(false)
-
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withConf(conf).build();
+
.enableDictionaryEncoding().withValidation(false).withWriterVersion(writerVersion).withConf(conf)
+ .build();
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
index 53975d2c11..5ccd2fed8b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
@@ -21,27 +21,31 @@ package org.apache.asterix.external.writer.printer;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ParquetExternalFilePrinterFactory implements
IExternalPrinterFactory {
private static final long serialVersionUID = 8971234908711234L;
- private final String schema;
+ private final String parquetSchemaString;
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
private final long rowGroupSize;
private final int pageSize;
+ private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinterFactory(CompressionCodecName
compressionCodecName, String schema, IAType typeInfo,
- long rowGroupSize, int pageSize) {
+ public ParquetExternalFilePrinterFactory(CompressionCodecName
compressionCodecName, String parquetSchemaString,
+ IAType typeInfo, long rowGroupSize, int pageSize,
ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.schema = schema;
+ this.parquetSchemaString = parquetSchemaString;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
+ this.writerVersion = writerVersion;
}
@Override
public IExternalPrinter createPrinter() {
- return new ParquetExternalFilePrinter(compressionCodecName, schema,
typeInfo, rowGroupSize, pageSize);
+ return new ParquetExternalFilePrinter(compressionCodecName,
parquetSchemaString, typeInfo, rowGroupSize,
+ pageSize, writerVersion);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
new file mode 100644
index 0000000000..410c951b64
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import java.util.Map;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class AsterixParquetTypeMap {
+
+ public static final Map<ATypeTag, PrimitiveType.PrimitiveTypeName>
PRIMITIVE_TYPE_NAME_MAP =
+ Map.ofEntries(Map.entry(ATypeTag.BOOLEAN,
PrimitiveType.PrimitiveTypeName.BOOLEAN),
+ Map.entry(ATypeTag.STRING,
PrimitiveType.PrimitiveTypeName.BINARY),
+ Map.entry(ATypeTag.TINYINT,
PrimitiveType.PrimitiveTypeName.INT32),
+ Map.entry(ATypeTag.SMALLINT,
PrimitiveType.PrimitiveTypeName.INT32),
+ Map.entry(ATypeTag.INTEGER,
PrimitiveType.PrimitiveTypeName.INT64),
+ Map.entry(ATypeTag.BIGINT,
PrimitiveType.PrimitiveTypeName.INT64),
+ Map.entry(ATypeTag.FLOAT,
PrimitiveType.PrimitiveTypeName.FLOAT),
+ Map.entry(ATypeTag.DOUBLE,
PrimitiveType.PrimitiveTypeName.DOUBLE),
+ Map.entry(ATypeTag.DATE,
PrimitiveType.PrimitiveTypeName.INT32),
+ Map.entry(ATypeTag.TIME,
PrimitiveType.PrimitiveTypeName.INT32),
+ Map.entry(ATypeTag.DATETIME,
PrimitiveType.PrimitiveTypeName.INT64));
+
+ public static final Map<ATypeTag, LogicalTypeAnnotation>
LOGICAL_TYPE_ANNOTATION_MAP =
+ Map.ofEntries(Map.entry(ATypeTag.STRING,
LogicalTypeAnnotation.stringType()),
+ Map.entry(ATypeTag.DATE, LogicalTypeAnnotation.dateType()),
+ Map.entry(ATypeTag.TIME,
+ LogicalTypeAnnotation.timeType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS)),
+ Map.entry(ATypeTag.DATETIME,
+ LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS)));
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
new file mode 100644
index 0000000000..b25d1f5668
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer.parquet;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+public class SchemaConverterVisitor implements IATypeVisitor<Void,
Pair<Types.Builder, String>> {
+ public static String MESSAGE_NAME = "asterix_schema";
+ private final ARecordType schemaType;
+ private ATypeTag unsupportedType;
+
+ private SchemaConverterVisitor(ARecordType schemaType) {
+ this.schemaType = schemaType;
+ this.unsupportedType = null;
+ }
+
+ public static String convertToParquetSchemaString(ARecordType schemaType)
throws CompilationException {
+ SchemaConverterVisitor schemaConverterVisitor = new
SchemaConverterVisitor(schemaType);
+ return schemaConverterVisitor.getParquetSchema().toString();
+ }
+
+ private MessageType getParquetSchema() throws CompilationException {
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+
+ for (int i = 0; i < schemaType.getFieldNames().length; i++) {
+ String fieldName = schemaType.getFieldNames()[i];
+ IAType childType = schemaType.getFieldType(fieldName);
+ childType.accept(this, new Pair<>(builder, fieldName));
+ if (unsupportedType != null) {
+ throw new
CompilationException(ErrorCode.TYPE_UNSUPPORTED_PARQUET_WRITE,
unsupportedType.toString());
+ }
+ }
+ return builder.named(MESSAGE_NAME);
+ }
+
+ @Override
+ public Void visit(ARecordType recordType, Pair<Types.Builder, String> arg)
{
+ Types.Builder builder = arg.first;
+ String fieldName = arg.second;
+
+ Types.BaseGroupBuilder childBuilder = getGroupChild(builder);
+ for (int i = 0; i < recordType.getFieldNames().length; i++) {
+ String childFieldName = recordType.getFieldNames()[i];
+ IAType childType = recordType.getFieldType(childFieldName);
+
+ childType.accept(this, new Pair<>(childBuilder, childFieldName));
+
+ }
+ childBuilder.named(fieldName);
+
+ return null;
+ }
+
+ @Override
+ public Void visit(AbstractCollectionType collectionType,
Pair<Types.Builder, String> arg) {
+ Types.Builder builder = arg.first;
+ String fieldName = arg.second;
+
+ Types.BaseListBuilder childBuilder = getListChild(builder);
+ IAType child = collectionType.getItemType();
+ child.accept(this, new Pair<>(childBuilder, fieldName));
+
+ return null;
+ }
+
+ @Override
+ public Void visit(AUnionType unionType, Pair<Types.Builder, String> arg) {
+ // Shouldn't reach here.
+ return null;
+ }
+
+ @Override
+ public Void visitFlat(IAType flatType, Pair<Types.Builder, String> arg) {
+ Types.Builder builder = arg.first;
+ String fieldName = arg.second;
+
+ PrimitiveType.PrimitiveTypeName primitiveTypeName =
+
AsterixParquetTypeMap.PRIMITIVE_TYPE_NAME_MAP.get(flatType.getTypeTag());
+
+ if (primitiveTypeName == null) {
+ unsupportedType = flatType.getTypeTag();
+ }
+
+ LogicalTypeAnnotation logicalTypeAnnotation =
+
AsterixParquetTypeMap.LOGICAL_TYPE_ANNOTATION_MAP.get(flatType.getTypeTag());
+
+ getPrimitiveChild(builder, primitiveTypeName,
logicalTypeAnnotation).named(fieldName);
+
+ return null;
+ }
+
+ private static Types.BaseGroupBuilder getGroupChild(Types.Builder parent) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>) parent).optionalGroup();
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>)
parent).optionalGroupElement();
+ } else {
+ return null;
+ }
+ }
+
+ private static Types.BaseListBuilder getListChild(Types.Builder parent) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>) parent).optionalList();
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>)
parent).optionalListElement();
+ } else {
+ return null;
+ }
+ }
+
+ private static Types.Builder getPrimitiveChild(Types.Builder parent,
PrimitiveType.PrimitiveTypeName type,
+ LogicalTypeAnnotation annotation) {
+ if (parent instanceof Types.BaseGroupBuilder) {
+ return ((Types.BaseGroupBuilder<?, ?>)
parent).optional(type).as(annotation);
+ } else if (parent instanceof Types.BaseListBuilder) {
+ return ((Types.BaseListBuilder<?, ?>)
parent).optionalElement(type).as(annotation);
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
index 599d5287b5..5bc4a71604 100644
---
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
+++
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
@@ -32,6 +32,8 @@ import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.IReturningStatement;
import org.apache.asterix.lang.common.clause.OrderbyClause;
import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.RecordTypeDefinition;
+import org.apache.asterix.lang.common.expression.TypeExpression;
import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
@@ -52,13 +54,14 @@ public class CopyToStatement extends AbstractStatement
implements IReturningStat
private List<Expression> partitionExpressions;
private List<Expression> orderByList;
private int varCounter;
+ private RecordTypeDefinition itemType;
public CopyToStatement(Namespace namespace, String datasetName, Query
query, VariableExpr sourceVariable,
ExternalDetailsDecl externalDetailsDecl, int varCounter,
List<Expression> keyExpressions,
boolean autogenerated) {
this(namespace, datasetName, query, sourceVariable,
externalDetailsDecl, new ArrayList<>(), new ArrayList<>(),
new HashMap<>(), new ArrayList<>(), new ArrayList<>(), new
ArrayList<>(), varCounter, keyExpressions,
- autogenerated);
+ autogenerated, null);
}
public CopyToStatement(Namespace namespace, String datasetName, Query
query, VariableExpr sourceVariable,
@@ -68,7 +71,18 @@ public class CopyToStatement extends AbstractStatement
implements IReturningStat
List<OrderbyClause.NullOrderModifier> orderByNullModifierList, int
varCounter) {
this(namespace, datasetName, query, sourceVariable,
externalDetailsDecl, pathExpressions, partitionExpressions,
partitionsVariables, orderbyList, orderByModifiers,
orderByNullModifierList, varCounter,
- new ArrayList<>(), false);
+ new ArrayList<>(), false, null);
+ }
+
+ public CopyToStatement(Namespace namespace, String datasetName, Query
query, VariableExpr sourceVariable,
+ ExternalDetailsDecl externalDetailsDecl, List<Expression>
pathExpressions,
+ List<Expression> partitionExpressions, Map<Integer, VariableExpr>
partitionsVariables,
+ List<Expression> orderbyList, List<OrderbyClause.OrderModifier>
orderByModifiers,
+ List<OrderbyClause.NullOrderModifier> orderByNullModifierList, int
varCounter,
+ RecordTypeDefinition itemType) {
+ this(namespace, datasetName, query, sourceVariable,
externalDetailsDecl, pathExpressions, partitionExpressions,
+ partitionsVariables, orderbyList, orderByModifiers,
orderByNullModifierList, varCounter,
+ new ArrayList<>(), false, itemType);
}
private CopyToStatement(Namespace namespace, String datasetName, Query
query, VariableExpr sourceVariable,
@@ -76,7 +90,7 @@ public class CopyToStatement extends AbstractStatement
implements IReturningStat
List<Expression> partitionExpressions, Map<Integer, VariableExpr>
partitionsVariables,
List<Expression> orderbyList, List<OrderbyClause.OrderModifier>
orderByModifiers,
List<OrderbyClause.NullOrderModifier> orderByNullModifierList, int
varCounter,
- List<Expression> keyExpressions, boolean autogenerated) {
+ List<Expression> keyExpressions, boolean autogenerated,
RecordTypeDefinition itemType) {
this.namespace = namespace;
this.datasetName = datasetName;
this.query = query;
@@ -91,6 +105,7 @@ public class CopyToStatement extends AbstractStatement
implements IReturningStat
this.varCounter = varCounter;
this.keyExpressions = keyExpressions != null ? keyExpressions : new
ArrayList<>();
this.autogenerated = autogenerated;
+ this.itemType = itemType;
if (pathExpressions.isEmpty()) {
// Ensure path expressions to have at least an empty string
@@ -117,6 +132,10 @@ public class CopyToStatement extends AbstractStatement
implements IReturningStat
this.namespace = namespace;
}
+ public RecordTypeDefinition getType() {
+ return itemType;
+ }
+
public Namespace getNamespace() {
return namespace;
}
@@ -192,6 +211,10 @@ public class CopyToStatement extends AbstractStatement
implements IReturningStat
return !orderByList.isEmpty();
}
+ public TypeExpression getItemType() {
+ return itemType;
+ }
+
@Override
public int getVarCounter() {
return varCounter;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 4276a14981..1cfa892893 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -2936,6 +2936,7 @@ CopyToStatement CopyToStatement(Token startToken,
Pair<Namespace, Identifier> na
Namespace namespace = nameComponents == null ? null : nameComponents.first;
String datasetName = nameComponents == null ? null :
nameComponents.second.getValue();
List<Expression> pathExprs;
+ RecordTypeDefinition typeExpr = null;
List<Expression> partitionExprs = new ArrayList<Expression>();
Map<Integer, VariableExpr> partitionVarExprs = new HashMap<Integer,
VariableExpr>();
@@ -2947,6 +2948,7 @@ CopyToStatement CopyToStatement(Token startToken,
Pair<Namespace, Identifier> na
<TO> adapterName = AdapterName()
<PATH> <LEFTPAREN> pathExprs = ExpressionList() <RIGHTPAREN>
(CopyToOverClause(partitionExprs, partitionVarExprs, orderbyList,
orderbyModifierList, orderbyNullModifierList))?
+ (<TYPE> <LEFTPAREN> typeExpr = RecordTypeDef() <RIGHTPAREN>)?
<WITH> withRecord = RecordConstructor()
{
ExternalDetailsDecl edd = new ExternalDetailsDecl();
@@ -2961,8 +2963,7 @@ CopyToStatement CopyToStatement(Token startToken,
Pair<Namespace, Identifier> na
usedAlias = new
VariableExpr(SqlppVariableUtil.toInternalVariableIdentifier(datasetName));
}
- CopyToStatement stmt = new CopyToStatement(namespace, datasetName,
query, usedAlias, edd, pathExprs,
- partitionExprs, partitionVarExprs, orderbyList,
orderbyModifierList, orderbyNullModifierList, getVarCounter());
+ CopyToStatement stmt = new CopyToStatement(namespace, datasetName,
query, usedAlias, edd, pathExprs, partitionExprs, partitionVarExprs,
orderbyList, orderbyModifierList, orderbyNullModifierList, getVarCounter(),
typeExpr);
return addSourceLocation(stmt, startToken);
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index c931a93f56..23b9f93edc 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -25,6 +25,7 @@ import java.util.zip.Deflater;
import org.apache.asterix.cloud.writer.GCSExternalFileWriterFactory;
import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
@@ -44,6 +45,7 @@ import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.util.StorageUtil;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ExternalWriterProvider {
@@ -111,7 +113,7 @@ public class ExternalWriterProvider {
}
public static IExternalPrinterFactory createPrinter(ICcApplicationContext
appCtx, IWriteDataSink sink,
- Object sourceType) {
+ Object sourceType) throws CompilationException {
Map<String, String> configuration = sink.getConfiguration();
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
@@ -130,11 +132,8 @@ public class ExternalWriterProvider {
IPrinterFactory printerFactory =
CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
return new TextualExternalFilePrinterFactory(printerFactory,
compressStreamFactory);
case ExternalDataConstants.FORMAT_PARQUET:
+ String parquetSchemaString =
configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
- if
(!configuration.containsKey(ExternalDataConstants.KEY_SCHEMA)) {
- throw new UnsupportedOperationException("Schema not
provided for parquet");
- }
- String schema =
configuration.get(ExternalDataConstants.KEY_SCHEMA);
CompressionCodecName compressionCodecName;
if (compression == null || compression.equals("") ||
compression.equals("none")) {
compressionCodecName = CompressionCodecName.UNCOMPRESSED;
@@ -148,13 +147,26 @@ public class ExternalWriterProvider {
long rowGroupSize =
StorageUtil.getByteValue(rowGroupSizeString);
int pageSize = (int) StorageUtil.getByteValue(pageSizeString);
- return new
ParquetExternalFilePrinterFactory(compressionCodecName, schema, (IAType)
sourceType,
- rowGroupSize, pageSize);
+ ParquetProperties.WriterVersion writerVersion =
getParquetWriterVersion(configuration);
+
+ return new
ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
+ (IAType) sourceType, rowGroupSize, pageSize,
writerVersion);
default:
throw new UnsupportedOperationException("Unsupported format "
+ format);
}
}
+ private static ParquetProperties.WriterVersion
getParquetWriterVersion(Map<String, String> configuration) {
+
+ if
(configuration.get(ExternalDataConstants.PARQUET_WRITER_VERSION_KEY) == null) {
+ return ParquetProperties.WriterVersion.PARQUET_1_0;
+ } else if
(configuration.get(ExternalDataConstants.PARQUET_WRITER_VERSION_KEY)
+ .equals(ExternalDataConstants.PARQUET_WRITER_VERSION_VALUE_2))
{
+ return ParquetProperties.WriterVersion.PARQUET_2_0;
+ } else
+ return ParquetProperties.WriterVersion.PARQUET_1_0;
+ }
+
private static String getRowGroupSize(Map<String, String> configuration) {
return
configuration.getOrDefault(ExternalDataConstants.KEY_PARQUET_ROW_GROUP_SIZE,
ExternalDataConstants.PARQUET_DEFAULT_ROW_GROUP_SIZE);
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 319d440e06..02327e252f 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -158,9 +158,8 @@ public enum ErrorCode implements IError {
UNSUPPORTED_WRITE_SPEC(128),
JOB_REJECTED(129),
FRAME_BIGGER_THAN_SORT_MEMORY(130),
- ILLGEAL_PARQUET_SCHEMA(131),
- RESULT_DOES_NOT_FOLLOW_SCHEMA(132),
- EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(133),
+ RESULT_DOES_NOT_FOLLOW_SCHEMA(131),
+ EXTRA_FIELD_IN_RESULT_NOT_FOUND_IN_SCHEMA(132),
// Compilation error codes.
RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 51acdc024a..226234f12c 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -148,9 +148,8 @@
128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
129 = Job %1$s failed to run. Cluster is not accepting jobs.
130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget.
Used=%3$s, max=%4$s. Please increase the sort memory budget.
-131 = Invalid parquet schema provided
-132 = Result does not follow the schema, %1$s type expected but found %2$s
type at '%3$s'
-133 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the
schema
+131 = Result does not follow the schema, %1$s type expected but found %2$s
type at '%3$s'
+132 = Extra field in the result, field '%1$s' does not exist at '%2$s' in the
schema
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s