This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f1cd4170f3e5ea0adf47bd37ef101a51b706819b Author: Ali Alsuliman <[email protected]> AuthorDate: Fri Apr 17 17:45:35 2020 -0700 [ASTERIXDB-2713][EXT] CSV & TSV support for external dataset p2 - user model changes: no - storage format changes: no - interface changes: no Details: - validate adapter configuration - ignore empty lines in CSV/TSV files - require "header" parameter for CSV/TSV formats - make some parameters case-insensitive - few fixes and clean-ups Change-Id: I2f523de0d482a358ada0c27236ff24616ad0d7da Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5848 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- asterixdb/asterix-app/data/csv/empty.csv | 5 + asterixdb/asterix-app/data/csv/sample_12.csv | 1 + asterixdb/asterix-app/data/csv/sample_13.csv | 11 ++ .../asterix/app/translator/QueryTranslator.java | 24 +++- .../csv-parser-001/csv-parser-001.1.ddl.sqlpp | 11 +- ...01.6.ddl.sqlpp => csv-parser-001.6.query.sqlpp} | 4 +- ...01.6.ddl.sqlpp => csv-parser-001.7.query.sqlpp} | 4 +- ...-001.6.ddl.sqlpp => csv-parser-001.8.ddl.sqlpp} | 0 .../tsv-parser-001/tsv-parser-002.1.ddl.sqlpp | 2 +- .../aws/s3/001/query-dataset.000.ddl.sqlpp | 3 +- .../aws/s3/002/query-dataset.000.ddl.sqlpp | 3 +- .../negative.000.ddl.sqlpp} | 4 +- .../csv-parser-001/csv-parser-001.6.adm | 3 + .../csv-parser-001/csv-parser-001.7.adm | 0 .../runtimets/testsuite_external_dataset.xml | 8 +- .../asterix/common/exceptions/ErrorCode.java | 4 +- .../src/main/resources/asx_errormsg/en.properties | 4 +- .../external/input/HDFSDataSourceFactory.java | 2 +- .../external/input/record/CharArrayRecord.java | 6 + .../record/reader/aws/AwsS3ReaderFactory.java | 18 --- .../stream/EmptyLineSeparatedRecordReader.java | 3 +- .../record/reader/stream/LineRecordReader.java | 23 ++- .../reader/stream/QuotedLineRecordReader.java | 66 ++++----- .../reader/stream/SemiStructuredRecordReader.java | 4 +- .../record/reader/stream/StreamRecordReader.java | 3 +- .../reader/stream/StreamRecordReaderFactory.java | 2 +- .../external/parser/DelimitedDataParser.java | 36 +++-- .../parser/factory/DelimitedDataParserFactory.java | 12 +- .../factory/RecordWithMetadataParserFactory.java | 2 +- .../external/provider/ParserFactoryProvider.java | 7 +- .../external/util/ExternalDataConstants.java | 26 +++- .../asterix/external/util/ExternalDataUtils.java | 156 +++++++++++++++------ .../external/classad/test/ClassAdToADMTest.java | 11 +- .../parser/test/ByteBufUTF8DecodeTest.java | 5 +- .../external/parser/test/RecordWithMetaTest.java | 5 +- .../asterix/metadata/feeds/FeedMetadataUtil.java | 10 +- .../org/apache/hyracks/test/support/TestUtils.java | 5 + 37 files changed, 330 insertions(+), 163 deletions(-) diff --git a/asterixdb/asterix-app/data/csv/empty.csv b/asterixdb/asterix-app/data/csv/empty.csv new file mode 100644 index 0000000..3f2ff2d --- /dev/null +++ b/asterixdb/asterix-app/data/csv/empty.csv @@ -0,0 +1,5 @@ + + + + + diff --git a/asterixdb/asterix-app/data/csv/sample_12.csv b/asterixdb/asterix-app/data/csv/sample_12.csv index 2ab7c6d..0c9baf5 100644 --- a/asterixdb/asterix-app/data/csv/sample_12.csv +++ b/asterixdb/asterix-app/data/csv/sample_12.csv @@ -1,3 +1,4 @@ +f1,f2,f3 1,true,"text" 2,false,"text" 3,true,"text" diff --git a/asterixdb/asterix-app/data/csv/sample_13.csv b/asterixdb/asterix-app/data/csv/sample_13.csv new file mode 100644 index 0000000..9f53f56 --- /dev/null +++ b/asterixdb/asterix-app/data/csv/sample_13.csv @@ -0,0 +1,11 @@ + + +f1,f2,f3,f4 + +1,,"good","recommend" + +2,,"bad","not recommend" +3,,"good", + + + diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 9d427ec..3906bd5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -723,7 +723,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen case EXTERNAL: ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl(); Map<String, String> properties = createExternalDatasetProperties(dd, metadataProvider, mdTxnCtx); - ExternalDataUtils.defaultConfiguration(properties); + ExternalDataUtils.normalize(properties); + ExternalDataUtils.validate(properties); + validateExternalDatasetDetails(externalDetails, properties); datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(), TransactionState.COMMIT); break; @@ -1970,9 +1972,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName); try { + Map<String, String> properties = loadStmt.getProperties(); + ExternalDataUtils.normalize(properties); + ExternalDataUtils.validate(properties); CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(), - loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted()); + loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted()); cls.setSourceLocation(stmt.getSourceLocation()); JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls, null, responsePrinter, warningCollector); @@ -2170,7 +2175,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen "A feed with this name " + feedName + " already exists."); } } - feed = new Feed(dataverseName, feedName, cfs.getConfiguration()); + Map<String, String> configuration = cfs.getConfiguration(); + ExternalDataUtils.normalize(configuration); + ExternalDataUtils.validate(configuration); + feed = new Feed(dataverseName, feedName, configuration); FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx); MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -3177,4 +3185,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId()); } } + + protected void validateExternalDatasetDetails(ExternalDetailsDecl externalDetails, Map<String, String> properties) + throws RuntimeDataException { + String adapter = externalDetails.getAdapter(); + // "format" parameter is needed for "S3" data source + if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(adapter) + && properties.get(ExternalDataConstants.KEY_FORMAT) == null) { + throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_FORMAT); + } + } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp index 5728e78..f7fe18c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp @@ -25,8 +25,11 @@ USE test; CREATE TYPE t1 AS {f1: string, f2: string, f3: string, f4: string, f5: string}; CREATE TYPE t2 AS {f1: string, f2: string, f3: string}; CREATE TYPE t3 AS {f1: int?, f2: boolean, f3: string?}; +CREATE TYPE t4 AS {f1: string, f2: string, f3: string, f4: string}; -CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="csv")); -CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sample_10.csv"), ("format"="csv")); -CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv")); -CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv")); \ No newline at end of file +CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="CSV"), ("header"="FALSE")); +CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sample_10.csv"), ("format"="Csv"), ("header"="False")); +CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"), ("header"="FALSE")); +CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True")); +CREATE EXTERNAL DATASET ds5(t4) USING localfs(("path"="asterix_nc1://data/csv/sample_13.csv"), ("format"="csv"), ("header"="True")); +CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty.csv"), ("format"="csv"), ("header"="false")); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp similarity index 94% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp index 86a1b59..a3d113d 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp @@ -17,4 +17,6 @@ * under the License. */ -DROP DATAVERSE test; \ No newline at end of file +USE test; + +FROM ds5 v SELECT VALUE v ORDER BY v.f1; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp similarity index 94% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp index 86a1b59..2e5b312 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp @@ -17,4 +17,6 @@ * under the License. */ -DROP DATAVERSE test; \ No newline at end of file +USE test; + +FROM ds6 v SELECT VALUE v ORDER BY v.f1; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp similarity index 100% rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp index c0faf16..cabe54b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp @@ -24,4 +24,4 @@ USE test; CREATE TYPE t1 AS {f1: int, f2: int, f3: string, f4: boolean, f5: bigint, f6: double}; -CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/tsv/sample_01.tsv"), ("format"="tsv")) \ No newline at end of file +CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/tsv/sample_01.tsv"), ("format"="tsv"), ("header"="FALSE")); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp index b906039..6184b19 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp @@ -32,5 +32,6 @@ CREATE EXTERNAL DATASET test(test) USING S3 ( ("serviceEndpoint"="http://localhost:8001"), ("container"="playground"), ("definition"="csv-data/reviews"), -("format"="csv") +("format"="Csv"), +("header"="false") ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp index d385bee..194adf6 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp @@ -32,5 +32,6 @@ CREATE EXTERNAL DATASET test(test) USING S3 ( ("serviceEndpoint"="http://localhost:8001"), ("container"="playground"), ("definition"="tsv-data/reviews"), -("format"="tsv") +("format"="TSV"), +("header"="False") ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp similarity index 94% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp index b906039..e0fc056 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp @@ -17,6 +17,7 @@ * under the License. */ +// "format" parameter is missing for S3 DROP DATAVERSE test IF EXISTS; CREATE DATAVERSE test; USE test; @@ -31,6 +32,5 @@ CREATE EXTERNAL DATASET test(test) USING S3 ( ("region"="us-west-2"), ("serviceEndpoint"="http://localhost:8001"), ("container"="playground"), -("definition"="csv-data/reviews"), -("format"="csv") +("definition"="tsv-data/reviews") ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm new file mode 100644 index 0000000..9a1d1c9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm @@ -0,0 +1,3 @@ +{ "f1": "1", "f2": "", "f3": "good", "f4": "recommend" } +{ "f1": "2", "f2": "", "f3": "bad", "f4": "not recommend" } +{ "f1": "3", "f2": "", "f3": "good", "f4": "" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm new file mode 100644 index 0000000..e69de29 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml index 9948209..2456f13 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml @@ -17,7 +17,7 @@ ! specific language governing permissions and limitations ! under the License. !--> -<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"> +<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp"> <test-group name="external-dataset"> <test-case FilePath="external-dataset"> <compilation-unit name="aws/s3/000"> @@ -34,5 +34,11 @@ <output-dir compare="Text">aws/s3/002</output-dir> </compilation-unit> </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="aws/s3/negative"> + <output-dir compare="Text">aws/s3/negative</output-dir> + <expected-error>Parameter(s) format must be specified</expected-error> + </compilation-unit> + </test-case> </test-group> </test-suite> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index da1c4e7..14f7c83 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -241,7 +241,7 @@ public class ErrorCode { public static final int LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_NAME = 3047; public static final int OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME = 3048; public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER = 3049; - public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE = 3050; + public static final int PARSER_INVALID_CHAR_LENGTH = 3050; public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH = 3051; public static final int INDEXING_EXTERNAL_FILE_INDEX_ACCESSOR_UNABLE_TO_FIND_FILE_INDEX = 3052; public static final int PARSER_ADM_DATA_PARSER_FIELD_NOT_NULL = 3053; @@ -307,6 +307,8 @@ public class ErrorCode { public static final int FAILED_TO_PARSE_METADATA = 3115; public static final int INPUT_DECODE_FAILURE = 3116; public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117; + public static final int PARAMETERS_REQUIRED = 3118; + public static final int MALFORMED_RECORD = 3119; // Lifecycle management errors public static final int DUPLICATE_PARTITION_ID = 4000; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 457206d..98ae1a7 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -239,7 +239,7 @@ 3047 = External %1$s not supported 3048 = Invalid feed runtime: %1$s 3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1 -3050 = '%1$s' is not a valid quote. The length of a quote should be 1 +3050 = '%1$s' is not a valid %2$s. The length of %2$s should be 1 3051 = Quote '%1$s' cannot be used with the delimiter '%2$s' 3052 = Was not able to find a file in the files index 3053 = Field %1$s can not be null @@ -305,6 +305,8 @@ 3115 = Failed to parse record metadata 3116 = Failed to decode input 3117 = Failed to parse record, malformed log record +3118 = Parameter(s) %1$s must be specified +3119 = Record number %1$s is malformed # Lifecycle management errors 4000 = Partition id %1$s for node %2$s already in use by node %3$s diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index f830376..7702dde 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -205,7 +205,7 @@ public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IInd IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration); if (recordReaderClazz != null) { StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance(); - streamReader.configure(createInputStream(ctx, partition, indexer), configuration); + streamReader.configure(ctx, createInputStream(ctx, partition, indexer), configuration); if (indexer != null) { return new IndexingStreamRecordReader(streamReader, indexer); } else { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java index 65ecd8d..aa4abb4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java @@ -134,4 +134,10 @@ public class CharArrayRecord implements IRawRecord<char[]> { strValue.getChars(0, strValue.length(), value, 0); this.size = strValue.length(); } + + public boolean isEmptyRecord() { + return size <= 0 + || (size == 1 && (value[0] == ExternalDataConstants.LF || value[0] == ExternalDataConstants.CR)) + || (size == 2 && value[0] == ExternalDataConstants.CR && value[1] == ExternalDataConstants.LF); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java index e78783a..6484d4e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java @@ -18,20 +18,16 @@ */ package org.apache.asterix.external.input.record.reader.aws; -import java.lang.reflect.InvocationTargetException; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.asterix.external.api.IRecordReader; -import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader; import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory; import org.apache.asterix.external.provider.StreamRecordReaderProvider; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; -import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; public class AwsS3ReaderFactory extends StreamRecordReaderFactory { @@ -73,18 +69,4 @@ public class AwsS3ReaderFactory extends StreamRecordReaderFactory { // record reader recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration); } - - @Override - public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) - throws HyracksDataException { - try { - StreamRecordReader streamRecordReader = - (StreamRecordReader) recordReaderClazz.getConstructor().newInstance(); - streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration); - return streamRecordReader; - } catch (InstantiationException | IllegalAccessException | InvocationTargetException - | NoSuchMethodException e) { - throw HyracksDataException.create(e); - } - } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java index 07b6250..24a68a7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.hyracks.api.context.IHyracksTaskContext; public class EmptyLineSeparatedRecordReader extends StreamRecordReader { @@ -135,7 +136,7 @@ public class EmptyLineSeparatedRecordReader extends StreamRecordReader { } @Override - public void configure(AsterixInputStream inputStream, Map<String, String> config) { + public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) { super.configure(inputStream); this.config = config; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java index be600ed..a27397e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; public class LineRecordReader extends StreamRecordReader { @@ -42,10 +43,12 @@ public class LineRecordReader extends StreamRecordReader { private static final String REQUIRED_CONFIGS = ""; @Override - public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException { + public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) + throws HyracksDataException { super.configure(inputStream); this.hasHeader = ExternalDataUtils.hasHeader(config); if (hasHeader) { + // TODO(ali): revisit this and notifyNewSource inputStream.setNotificationHandler(this); } } @@ -100,13 +103,16 @@ public class LineRecordReader extends StreamRecordReader { startPosn = bufferPosn = 0; bufferLength = reader.read(inputBuffer); if (bufferLength <= 0) { - if (readLength > 0) { - record.endRecord(); - recordNumber++; - return true; + if (readLength <= 0) { + close(); + return false; //EOF } - close(); - return false; //EOF + record.endRecord(); + if (record.isEmptyRecord()) { + return false; + } + recordNumber++; + return true; } } for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline @@ -130,6 +136,9 @@ public class LineRecordReader extends StreamRecordReader { record.append(inputBuffer, startPosn, readLength); } } while (newlineLength == 0); + if (record.isEmptyRecord()) { + continue; + } if (nextIsHeader) { nextIsHeader = false; continue; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java index 1fd328b..564df4b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java @@ -24,39 +24,35 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.WarningUtil; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.SourceLocation; public class QuotedLineRecordReader extends LineRecordReader { private char quote; private char quoteEscape; + private IWarningCollector warningCollector; + private final SourceLocation srcLoc = new SourceLocation(-1, -1); private static final List<String> recordReaderFormats = Collections.unmodifiableList( Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV)); private static final String REQUIRED_CONFIGS = ExternalDataConstants.KEY_QUOTE; @Override - public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException { - super.configure(inputStream, config); + public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) + throws HyracksDataException { + super.configure(ctx, inputStream, config); + this.warningCollector = ctx.getWarningCollector(); String quoteString = config.get(ExternalDataConstants.KEY_QUOTE); - if (quoteString.length() != 1) { - throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE, - ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString)); - } + ExternalDataUtils.validateQuote(quoteString); this.quote = quoteString.charAt(0); - String escapeString = config.get(ExternalDataConstants.KEY_QUOTE_ESCAPE); - if (escapeString == null) { - quoteEscape = ExternalDataConstants.ESCAPE; - } else { - if (escapeString.length() != 1) { - throw new HyracksDataException( - ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE_ESCAPE, - ExternalDataConstants.PARAMETER_OF_SIZE_ONE, escapeString)); - } - quoteEscape = escapeString.charAt(0); - } + this.quoteEscape = ExternalDataUtils.validateGetQuoteEscape(config); } @Override @@ -87,16 +83,22 @@ public class QuotedLineRecordReader extends LineRecordReader { startPosn = bufferPosn = 0; bufferLength = reader.read(inputBuffer); if (bufferLength <= 0) { - if (readLength > 0) { - if (inQuote) { - throw new IOException("malformed input record ended inside quote"); + // reached end of stream + if (readLength <= 0 || inQuote) { + // haven't read anything previously OR have read and in the middle and hit the end + if (inQuote && warningCollector.shouldWarn()) { + warningCollector + .warn(WarningUtil.forAsterix(srcLoc, ErrorCode.MALFORMED_RECORD, recordNumber)); } - record.endRecord(); - recordNumber++; - return true; + close(); + return false; } - close(); - return false; + record.endRecord(); + if (record.isEmptyRecord()) { + return false; + } + recordNumber++; + return true; } } boolean maybeInQuote = false; @@ -121,12 +123,9 @@ public class QuotedLineRecordReader extends LineRecordReader { // this is an opening quote inQuote = true; } - if (prevCharEscape) { - prevCharEscape = false; - } else { - // the quoteEscape != quote is for making an opening quote not an escape - prevCharEscape = inputBuffer[bufferPosn] == quoteEscape && quoteEscape != quote; - } + // the quoteEscape != quote is for making an opening quote not an escape + prevCharEscape = + inputBuffer[bufferPosn] == quoteEscape && !prevCharEscape && quoteEscape != quote; } else { // if quote == quoteEscape and current char is quote, then it could be closing or escaping if (inputBuffer[bufferPosn] == quote && !prevCharEscape) { @@ -146,6 +145,9 @@ public class QuotedLineRecordReader extends LineRecordReader { record.append(inputBuffer, startPosn, readLength); } } while (newlineLength == 0); + if (record.isEmptyRecord()) { + continue; + } if (nextIsHeader) { nextIsHeader = false; continue; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java index 883f0ee..5ab5730 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java @@ -29,6 +29,7 @@ import org.apache.asterix.common.exceptions.ExceptionUtils; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; public class SemiStructuredRecordReader extends StreamRecordReader { @@ -45,7 +46,8 @@ public class SemiStructuredRecordReader extends StreamRecordReader { private static final String REQUIRED_CONFIGS = ""; @Override - public void configure(AsterixInputStream stream, Map<String, String> config) throws HyracksDataException { + public void configure(IHyracksTaskContext ctx, AsterixInputStream stream, Map<String, String> config) + throws HyracksDataException { super.configure(stream); String recStartString = config.get(ExternalDataConstants.KEY_RECORD_START); String recEndString = config.get(ExternalDataConstants.KEY_RECORD_END); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java index 5629f48..4aed741 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java @@ -31,6 +31,7 @@ import org.apache.asterix.external.input.record.CharArrayRecord; import org.apache.asterix.external.input.stream.AsterixInputStreamReader; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler { @@ -100,6 +101,6 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre public abstract String getRequiredConfigs(); - public abstract void configure(AsterixInputStream inputStream, Map<String, String> config) + public abstract void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java index 076842e..9bb50f6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java @@ -95,7 +95,7 @@ public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> { try { StreamRecordReader streamRecordReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance(); - streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration); + streamRecordReader.configure(ctx, streamFactory.createInputStream(ctx, partition), configuration); return streamRecordReader; } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java index 8facce6..505acbd 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java @@ -35,6 +35,7 @@ import org.apache.asterix.om.base.AMutableString; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.parsers.IValueParser; @@ -43,22 +44,23 @@ import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser; public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> { + private final IHyracksTaskContext ctx; private final char fieldDelimiter; private final char quote; private final boolean hasHeader; - private ARecordType recordType; - private IARecordBuilder recBuilder; - private ArrayBackedValueStorage fieldValueBuffer; - private DataOutput fieldValueBufferOutput; - private IValueParser[] valueParsers; + private final ARecordType recordType; + private final IARecordBuilder recBuilder; + private final ArrayBackedValueStorage fieldValueBuffer; + private final DataOutput fieldValueBufferOutput; + private final IValueParser[] valueParsers; private FieldCursorForDelimitedDataParser cursor; - private byte[] fieldTypeTags; - private int[] fldIds; - private ArrayBackedValueStorage[] nameBuffers; - private boolean areAllNullFields; + private final byte[] fieldTypeTags; + private final int[] fldIds; + private final ArrayBackedValueStorage[] nameBuffers; - public DelimitedDataParser(IValueParserFactory[] valueParserFactories, char fieldDelimiter, char quote, - boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException { + public DelimitedDataParser(IHyracksTaskContext ctx, IValueParserFactory[] valueParserFactories, char fieldDelimiter, + char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException { + this.ctx = ctx; this.fieldDelimiter = fieldDelimiter; this.quote = quote; this.hasHeader = hasHeader; @@ -107,10 +109,8 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa try { while (cursor.nextRecord()) { parseRecord(); - if (!areAllNullFields) { - recBuilder.write(out, true); - return true; - } + recBuilder.write(out, true); + return true; } return false; } catch (IOException e) { @@ -121,7 +121,6 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa private void parseRecord() throws HyracksDataException { recBuilder.reset(recordType); recBuilder.init(); - areAllNullFields = true; for (int i = 0; i < valueParsers.length; ++i) { try { @@ -152,7 +151,6 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa } valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength(), fieldValueBufferOutput); - areAllNullFields = false; } if (fldIds[i] < 0) { recBuilder.addField(nameBuffers[i], fieldValueBuffer); @@ -172,9 +170,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException { cursor.nextRecord(record.get(), record.size()); parseRecord(); - if (!areAllNullFields) { - recBuilder.write(out, true); - } + recBuilder.write(out, true); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java index 1fee49f..46c5152 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java @@ -42,15 +42,15 @@ public class DelimitedDataParserFactory extends AbstractRecordStreamParserFactor @Override public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException { - return createParser(); + return createParser(ctx); } - private DelimitedDataParser createParser() throws HyracksDataException { + private DelimitedDataParser createParser(IHyracksTaskContext ctx) throws HyracksDataException { IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType); - char delimiter = ExternalDataUtils.getDelimiter(configuration); - char quote = ExternalDataUtils.getQuote(configuration, delimiter); + char delimiter = ExternalDataUtils.validateGetDelimiter(configuration); + char quote = ExternalDataUtils.validateGetQuote(configuration, delimiter); boolean hasHeader = ExternalDataUtils.hasHeader(configuration); - return new DelimitedDataParser(valueParserFactories, delimiter, quote, hasHeader, recordType, + return new DelimitedDataParser(ctx, valueParserFactories, delimiter, quote, hasHeader, recordType, ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM)); } @@ -62,7 +62,7 @@ public class DelimitedDataParserFactory extends AbstractRecordStreamParserFactor @Override public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition) throws HyracksDataException { - return createParser(); + return createParser(ctx); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java index d34b5e0..704ea29 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java @@ -55,7 +55,7 @@ public class RecordWithMetadataParserFactory<I, O> implements IRecordDataParserF String recordFormat = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT); if (recordFormat == null) { throw AlgebricksException.create(ErrorCode.UNKNOWN_RECORD_FORMAT_FOR_META_PARSER, - ExternalDataConstants.KEY_FORMAT); + ExternalDataConstants.KEY_RECORD_FORMAT); } String format = configuration.get(ExternalDataConstants.KEY_FORMAT); if (format == null) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java index 53cf6b1..2265a25 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java @@ -46,15 +46,12 @@ public class ParserFactoryProvider { public static IDataParserFactory getDataParserFactory(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException { IDataParserFactory parserFactory; - String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER); + String parserFactoryName = configuration.get(ExternalDataConstants.KEY_PARSER); if (ExternalDataUtils.isExternal(parserFactoryName)) { return ExternalDataUtils.createExternalParserFactory(libraryManager, ExternalDataUtils.getDataverse(configuration), parserFactoryName); } else { - String parserFactoryKey = ExternalDataUtils.getRecordFormat(configuration); - if (parserFactoryKey == null) { - parserFactoryKey = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY); - } + String parserFactoryKey = ExternalDataUtils.getParserFactory(configuration); parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey); } return parserFactory; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 1378207..a05ad77 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -18,6 +18,10 @@ */ package org.apache.asterix.external.util; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + public class ExternalDataConstants { private ExternalDataConstants() { @@ -168,6 +172,26 @@ public class ExternalDataConstants { public static final String FORMAT_LINE_SEPARATED = "line-separated"; public static final String FORMAT_HDFS_WRITABLE = "hdfs-writable"; public static final String FORMAT_KV = "kv"; + public static final String FORMAT_CSV = "csv"; + public static final String FORMAT_TSV = "tsv"; + public static final Set<String> ALL_FORMATS; + static { + Set<String> formats = new HashSet<>(13); + formats.add(FORMAT_HIVE); + formats.add(FORMAT_BINARY); + formats.add(FORMAT_ADM); + formats.add(FORMAT_JSON_LOWER_CASE); + formats.add(FORMAT_DELIMITED_TEXT); + formats.add(FORMAT_TWEET); + formats.add(FORMAT_RSS); + formats.add(FORMAT_SEMISTRUCTURED); + formats.add(FORMAT_LINE_SEPARATED); + formats.add(FORMAT_HDFS_WRITABLE); + formats.add(FORMAT_KV); + formats.add(FORMAT_CSV); + formats.add(FORMAT_TSV); + ALL_FORMATS = Collections.unmodifiableSet(formats); + } /** * input streams @@ -234,8 +258,6 @@ public class ExternalDataConstants { public static final String EXTERNAL = "external"; public static final String KEY_READER_FACTORY = "reader-factory"; public static final String READER_RSS = "rss_feed"; - public static final String FORMAT_CSV = "csv"; - public static final String FORMAT_TSV = "tsv"; public static final String ERROR_PARSE_RECORD = "Parser failed to parse record"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 443aa7e..b7b441b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -44,41 +44,49 @@ import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; public class ExternalDataUtils { + private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class); + static { + valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE); + valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE); + valueParserFactoryMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE); + valueParserFactoryMap.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE); + valueParserFactoryMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE); + valueParserFactoryMap.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE); + } + private ExternalDataUtils() { } // Get a delimiter from the given configuration - public static char getDelimiter(Map<String, String> configuration) throws HyracksDataException { + public static char validateGetDelimiter(Map<String, String> configuration) throws HyracksDataException { String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER); if (delimiterValue == null) { - delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER; - } else if (delimiterValue.length() != 1) { - throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER, - delimiterValue); + return ExternalDataConstants.DEFAULT_DELIMITER.charAt(0); } + validateDelimiter(delimiterValue); return delimiterValue.charAt(0); } // Get a quote from the given configuration when the delimiter is given // Need to pass delimiter to check whether they share the same character - public static char getQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException { + public static char validateGetQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException { String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE); if (quoteValue == null) { - quoteValue = ExternalDataConstants.DEFAULT_QUOTE; - } else if (quoteValue.length() != 1) { - throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE, - quoteValue); + return ExternalDataConstants.DEFAULT_QUOTE.charAt(0); } + validateQuote(quoteValue); + char quote = quoteValue.charAt(0); + validateDelimiterAndQuote(delimiter, quote); + return quote; + } - // Since delimiter (char type value) can't be null, - // we only check whether delimiter and quote use the same character - if (quoteValue.charAt(0) == delimiter) { - throw new RuntimeDataException( - ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quoteValue, - delimiter); + public static char validateGetQuoteEscape(Map<String, String> configuration) throws HyracksDataException { + String quoteEscapeValue = configuration.get(ExternalDataConstants.KEY_QUOTE_ESCAPE); + if (quoteEscapeValue == null) { + return ExternalDataConstants.ESCAPE; } - - return quoteValue.charAt(0); + validateQuoteEscape(quoteEscapeValue); + return quoteEscapeValue.charAt(0); } public static void validateDataParserParameters(Map<String, String> configuration) throws AsterixException { @@ -86,8 +94,8 @@ public class ExternalDataUtils { if (parser == null) { String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY); if (parserFactory == null) { - throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " or " - + ExternalDataConstants.KEY_PARSER_FACTORY + " must be specified."); + throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, + ExternalDataConstants.KEY_FORMAT + " or " + ExternalDataConstants.KEY_PARSER_FACTORY); } } } @@ -95,7 +103,7 @@ public class ExternalDataUtils { public static void validateDataSourceParameters(Map<String, String> configuration) throws AsterixException { String reader = configuration.get(ExternalDataConstants.KEY_READER); if (reader == null) { - throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified."); + throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_READER); } } @@ -141,22 +149,13 @@ public class ExternalDataUtils { return configuration.get(ExternalDataConstants.KEY_DATAVERSE); } - public static String getRecordFormat(Map<String, String> configuration) { - String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER); - return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT); - } - - private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap(); - - private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() { - Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class); - m.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE); - m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE); - m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE); - m.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE); - m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE); - m.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE); - return m; + public static String getParserFactory(Map<String, String> configuration) { + String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER); + if (parserFactory != null) { + return parserFactory; + } + parserFactory = configuration.get(ExternalDataConstants.KEY_FORMAT); + return parserFactory != null ? parserFactory : configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY); } public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) { @@ -284,8 +283,7 @@ public class ExternalDataUtils { public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException { String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES); if (keyIndexes == null) { - throw new AsterixException( - "A change feed must have the parameter " + ExternalDataConstants.KEY_KEY_INDEXES); + throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_KEY_INDEXES); } return keyIndexes.split(",").length; } @@ -340,7 +338,7 @@ public class ExternalDataUtils { } /** - * Prepares the configuration of the external dataset and its adapter by filling the information required by + * Prepares the configuration of the external data and its adapter by filling the information required by * adapters and parsers. * * @param adapterName adapter name @@ -355,4 +353,82 @@ public class ExternalDataUtils { configuration.put(ExternalDataConstants.KEY_PARSER, configuration.get(ExternalDataConstants.KEY_FORMAT)); } } + + /** + * Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting + * the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory. + * + * @param configuration external data configuration + */ + public static void normalize(Map<String, String> configuration) { + // normalize the "format" parameter + String paramValue = configuration.get(ExternalDataConstants.KEY_FORMAT); + if (paramValue != null) { + String lowerCaseFormat = paramValue.toLowerCase().trim(); + if (ExternalDataConstants.ALL_FORMATS.contains(lowerCaseFormat)) { + configuration.put(ExternalDataConstants.KEY_FORMAT, lowerCaseFormat); + } + } + // normalize the "header" parameter + paramValue = configuration.get(ExternalDataConstants.KEY_HEADER); + if (paramValue != null) { + configuration.put(ExternalDataConstants.KEY_HEADER, paramValue.toLowerCase().trim()); + } + } + + /** + * Validates the parameter values of the adapter configuration. This should happen after normalizing the values. + * + * @param configuration external data configuration + * @throws HyracksDataException HyracksDataException + */ + public static void validate(Map<String, String> configuration) throws HyracksDataException { + String format = configuration.get(ExternalDataConstants.KEY_FORMAT); + String header = configuration.get(ExternalDataConstants.KEY_HEADER); + if (format != null && isHeaderRequiredFor(format) && header == null) { + throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_HEADER); + } + if (header != null && !isBoolean(header)) { + throw new RuntimeDataException(ErrorCode.INVALID_REQ_PARAM_VAL, ExternalDataConstants.KEY_HEADER, header); + } + char delimiter = validateGetDelimiter(configuration); + validateGetQuote(configuration, delimiter); + validateGetQuoteEscape(configuration); + } + + private static boolean isHeaderRequiredFor(String format) { + return format.equals(ExternalDataConstants.FORMAT_CSV) || format.equals(ExternalDataConstants.FORMAT_TSV); + } + + private static boolean isBoolean(String value) { + return value.equals(ExternalDataConstants.TRUE) || value.equals(ExternalDataConstants.FALSE); + } + + private static void validateDelimiter(String delimiter) throws RuntimeDataException { + if (delimiter.length() != 1) { + throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER, + delimiter); + } + } + + public static void validateQuote(String quote) throws RuntimeDataException { + if (quote.length() != 1) { + throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quote, + ExternalDataConstants.KEY_QUOTE); + } + } + + private static void validateQuoteEscape(String quoteEsc) throws RuntimeDataException { + if (quoteEsc.length() != 1) { + throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quoteEsc, + ExternalDataConstants.KEY_QUOTE_ESCAPE); + } + } + + private static void validateDelimiterAndQuote(char delimiter, char quote) throws RuntimeDataException { + if (quote == delimiter) { + throw new RuntimeDataException( + ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quote, delimiter); + } + } } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java index 47f784a..8ef6273 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java @@ -48,9 +48,11 @@ import org.apache.asterix.om.types.IAType; import org.apache.commons.io.FileUtils; import org.apache.hyracks.algebricks.data.IPrinter; import org.apache.hyracks.algebricks.data.IPrinterFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.test.support.TestUtils; import org.junit.Assert; import junit.framework.Test; @@ -58,6 +60,9 @@ import junit.framework.TestCase; import junit.framework.TestSuite; public class ClassAdToADMTest extends TestCase { + + private final IHyracksTaskContext ctx = TestUtils.createHyracksTask(); + /** * Create the test case * @@ -123,7 +128,7 @@ public class ClassAdToADMTest extends TestCase { FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); LocalFSInputStream in = new LocalFSInputStream(watcher); SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(); - recordReader.configure(in, config); + recordReader.configure(ctx, in, config); while (recordReader.hasNext()) { tb.reset(); IRawRecord<char[]> record = recordReader.next(); @@ -162,7 +167,7 @@ public class ClassAdToADMTest extends TestCase { FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); LocalFSInputStream in = new LocalFSInputStream(watcher); SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(); - recordReader.configure(in, config); + recordReader.configure(ctx, in, config); try { Value val = new Value(objectPool); while (recordReader.hasNext()) { @@ -204,7 +209,7 @@ public class ClassAdToADMTest extends TestCase { FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); LocalFSInputStream in = new LocalFSInputStream(watcher); SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(); - recordReader.configure(in, config); + recordReader.configure(ctx, in, config); try { Value val = new Value(objectPool); while (recordReader.hasNext()) { diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java index 2ba5a3e..2972542 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java @@ -41,6 +41,8 @@ import org.apache.asterix.external.input.record.reader.stream.SemiStructuredReco import org.apache.asterix.external.input.stream.LocalFSInputStream; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FileSystemWatcher; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.test.support.TestUtils; import org.junit.Assert; import org.junit.Test; @@ -56,6 +58,7 @@ public class ByteBufUTF8DecodeTest { private final CharBuffer chars = CharBuffer.allocate(BUFFER_SIZE); private final CharArrayRecord value = new CharArrayRecord(); private final ByteBuf nettyBuffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(KB32, Integer.MAX_VALUE); + private final IHyracksTaskContext ctx = TestUtils.createHyracksTask(); @Test public void eatGlass() { @@ -83,7 +86,7 @@ public class ByteBufUTF8DecodeTest { FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false); LocalFSInputStream in = new LocalFSInputStream(watcher); try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader()) { - recordReader.configure(in, config); + recordReader.configure(ctx, in, config); while (recordReader.hasNext()) { try { IRawRecord<char[]> record = recordReader.next(); diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java index 1584065..47c6ffe 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java @@ -44,12 +44,15 @@ import org.apache.asterix.om.types.IAType; import org.apache.commons.io.FileUtils; import org.apache.hyracks.algebricks.data.IPrinter; import org.apache.hyracks.algebricks.data.IPrinterFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.test.support.TestUtils; import org.junit.Assert; public class RecordWithMetaTest { + private final IHyracksTaskContext ctx = TestUtils.createHyracksTask(); private static ARecordType recordType; @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -90,7 +93,7 @@ public class RecordWithMetaTest { config.put(ExternalDataConstants.KEY_HEADER, "true"); config.put(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE); LineRecordReader lineReader = new LineRecordReader(); - lineReader.configure(inputStream, config); + lineReader.configure(ctx, inputStream, config); // create csv with json record reader CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter( valueIndex, delimiter, metaType, recordType, pkIndicators, pkIndexes, keyTypes); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java index 7ed53e4..071fd1c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java @@ -135,13 +135,13 @@ public class FeedMetadataUtil { default: throw new AsterixException("Unknown Adapter type " + adapterType); } - adapterFactory.setOutputType(adapterOutputType); - adapterFactory.setMetaType(metaType); - adapterFactory.configure(appCtx.getServiceContext(), configuration); } else { - AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(), adapterName, configuration, - adapterOutputType, metaType); + ExternalDataUtils.prepare(adapterName, configuration); + adapterFactory = (ITypedAdapterFactory) appCtx.getAdapterFactoryService().createAdapterFactory(); } + adapterFactory.setOutputType(adapterOutputType); + adapterFactory.setMetaType(metaType); + adapterFactory.configure(appCtx.getServiceContext(), configuration); if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) { metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME)); if (metaType == null) { diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java index e44480c..3f78234 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java @@ -51,6 +51,7 @@ import org.apache.logging.log4j.core.config.Configuration; public class TestUtils { + private static final int DEFAULT_FRAME_SIZE = 32768; public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() { @Override public void warn(Warning warning) { @@ -68,6 +69,10 @@ public class TestUtils { } }; + public static IHyracksTaskContext createHyracksTask() { + return create(DEFAULT_FRAME_SIZE); + } + public static IHyracksTaskContext create(int frameSize) { IOManager ioManager = null; try {
