This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit e844fad3435568452d6f738b45570d3bc1295039 Author: Ali Alsuliman <[email protected]> AuthorDate: Mon May 11 20:36:17 2020 -0700 [ASTERIXDB-2726][EXT] Report line number instead of record number in messages of parsers - user model changes: no - storage format changes: no - interface changes: yes IRecordReader: added getLineNumber() to provide line number for parsers and converters. IRecordConverter: added configure() to pass the line number supplier to the record converter. IRecordDataParser: pass line number supplier from the Reader to the Parser. Details: Report line number instead of record number in messages of parsers. - added getPreviousStreamName() to allow readers to report errors happening on the previous stream when the underlying stream has already switched to a new one. - changed the test executor to compare actual warnings issues by a test case with the expected warnigns properly. Change-Id: I00508d8eeca4d9bae95f55ab51ecfb0ce2ced6b0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6245 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> --- asterixdb/asterix-app/data/csv/error1_line_num.csv | 3 + asterixdb/asterix-app/data/csv/error2_line_num.csv | 5 ++ .../data/json/malformed-json-no-closing.json | 2 + .../test/common/CancellationTestExecutor.java | 4 +- .../org/apache/asterix/test/common/IPollTask.java | 5 +- .../apache/asterix/test/common/TestExecutor.java | 70 ++++++++++++++-------- .../aws/AwsS3ExternalDatasetTest.java | 35 +++++++---- .../s3/csv-warnings/query-dataset.002.ddl.sqlpp | 2 + .../csv-warnings/query-dataset.014.s3bucket.sqlpp} | 9 +-- ...t.002.ddl.sqlpp => query-dataset.015.ddl.sqlpp} | 11 +--- .../s3/csv-warnings/query-dataset.016.query.sqlpp} | 12 ++-- .../csv-warnings/query-dataset.017.s3bucket.sqlpp} | 9 +-- .../s3/csv-warnings/query-dataset.018.query.sqlpp} | 12 ++-- .../json-warnings.001.s3bucket.sqlpp} | 9 +-- .../json-warnings.002.ddl.sqlpp} | 7 +-- .../json-warnings/json-warnings.003.query.sqlpp} | 12 ++-- .../s3/json-warnings/json-warnings.099.ddl.sqlpp} | 9 +-- .../queries_sqlpp/objects/ObjectsQueries.xml | 2 +- .../aws/s3/csv-warnings/external_dataset.006.adm | 1 + .../aws/s3/csv-warnings/external_dataset.007.adm | 2 + .../aws/s3/json-warnings/json-warnings.003.adm | 1 + .../testsuite_external_dataset_one_partition.xml | 53 +++++++++------- .../asterix/external/api/AsterixInputStream.java | 4 ++ .../asterix/external/api/IRecordConverter.java | 9 +++ .../asterix/external/api/IRecordDataParser.java | 7 ++- .../apache/asterix/external/api/IRecordReader.java | 5 ++ .../CSVToRecordWithMetadataAndPKConverter.java | 9 ++- .../input/record/reader/aws/AwsS3InputStream.java | 17 ++++-- .../record/reader/stream/LineRecordReader.java | 20 ++++++- .../reader/stream/QuotedLineRecordReader.java | 27 +++++---- .../reader/stream/SemiStructuredRecordReader.java | 51 ++++++++++++---- .../record/reader/stream/StreamRecordReader.java | 6 ++ .../input/stream/AsterixInputStreamReader.java | 4 ++ .../external/input/stream/LocalFSInputStream.java | 11 ++++ .../external/parser/DelimitedDataParser.java | 45 ++++++++++---- .../external/parser/RecordWithMetadataParser.java | 8 +++ .../provider/DataflowControllerProvider.java | 2 +- .../external/util/ExternalDataConstants.java | 4 +- .../apache/asterix/external/util/ParseUtil.java | 4 +- .../src/main/resources/errormsg/en.properties | 2 +- .../file/FieldCursorForDelimitedDataParser.java | 34 ++++++++--- 41 files changed, 351 insertions(+), 193 deletions(-) diff --git a/asterixdb/asterix-app/data/csv/error1_line_num.csv b/asterixdb/asterix-app/data/csv/error1_line_num.csv new file mode 100644 index 0000000..34bcee9 --- /dev/null +++ b/asterixdb/asterix-app/data/csv/error1_line_num.csv @@ -0,0 +1,3 @@ +1,"good","recommend" + +2,"bad" ,"not recommend" \ No newline at end of file diff --git a/asterixdb/asterix-app/data/csv/error2_line_num.csv b/asterixdb/asterix-app/data/csv/error2_line_num.csv new file mode 100644 index 0000000..0f1286f --- /dev/null +++ b/asterixdb/asterix-app/data/csv/error2_line_num.csv @@ -0,0 +1,5 @@ +1,"good","recommend" +2,"bad and +not so good and +bad" ,"not recommend" +3,"good","recommend" \ No newline at end of file diff --git a/asterixdb/asterix-app/data/json/malformed-json-no-closing.json b/asterixdb/asterix-app/data/json/malformed-json-no-closing.json new file mode 100644 index 0000000..83f3087 --- /dev/null +++ b/asterixdb/asterix-app/data/json/malformed-json-no-closing.json @@ -0,0 +1,2 @@ +{ "field1": 1, "field2": "text" + diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java index 36a06c0..d4f92fb 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.net.URI; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.BitSet; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -129,8 +130,7 @@ public class CancellationTestExecutor extends TestExecutor { } @Override - protected void ensureWarnings(int actualWarnCount, int expectedWarnCount, TestCase.CompilationUnit cUnit) - throws Exception { + protected void ensureWarnings(BitSet expectedWarnings, TestCase.CompilationUnit cUnit) throws Exception { // skip checking warnings as currently cancelled queries with warnings might not run successfully at all } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java index ab90244..a1ed12b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java @@ -19,6 +19,7 @@ package org.apache.asterix.test.common; import java.io.File; +import java.util.BitSet; import java.util.List; import java.util.Map; @@ -43,11 +44,11 @@ public interface IPollTask { * @param expectedResultFileCtxs * @param testFile * @param actualPath - * @param actualWarnCount + * @param expectedWarnings */ void execute(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount, - List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, MutableInt actualWarnCount) + List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, BitSet expectedWarnings) throws Exception; } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index a10dc54..10869c1 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -45,6 +45,7 @@ import java.text.MessageFormat; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -53,6 +54,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.Queue; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -65,6 +67,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.asterix.api.http.server.QueryServiceRequestParameters; @@ -159,9 +162,9 @@ public class TestExecutor { private static final ContentType TEXT_PLAIN_UTF8 = ContentType.create(HttpUtil.ContentType.APPLICATION_JSON, UTF_8); private final IPollTask plainExecutor = (testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, - queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount) -> executeTestFile(testCaseCtx, + queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings) -> executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs, - testFile, actualPath, actualWarnCount); + testFile, actualPath, expectedWarnings); public static final String DELIVERY_ASYNC = "async"; public static final String DELIVERY_DEFERRED = "deferred"; @@ -947,7 +950,7 @@ public class TestExecutor { public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, - MutableInt actualWarnCount) throws Exception { + BitSet expectedWarnings) throws Exception { InputStream resultStream; File qbcFile; boolean failed = false; @@ -975,11 +978,11 @@ public class TestExecutor { case "pollquery": poll(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs, testFile, actualPath, ctx.getType().substring("poll".length()), - actualWarnCount, plainExecutor); + expectedWarnings, plainExecutor); break; case "polldynamic": polldynamic(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, - expectedResultFileCtxs, testFile, actualPath, actualWarnCount); + expectedResultFileCtxs, testFile, actualPath, expectedWarnings); break; case "query": case "async": @@ -1006,7 +1009,7 @@ public class TestExecutor { if (testCaseCtx.getTestCase().isCheckWarnings()) { boolean expectedSourceLoc = testCaseCtx.isSourceLocationExpected(cUnit); - validateWarnings(extractedResult.getWarnings(), cUnit.getExpectedWarn(), actualWarnCount, + validateWarnings(extractedResult.getWarnings(), cUnit.getExpectedWarn(), expectedWarnings, expectedSourceLoc); } break; @@ -1413,17 +1416,17 @@ public class TestExecutor { private void polldynamic(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, - MutableInt actualWarnCount) throws Exception { + BitSet expectedWarnings) throws Exception { IExpectedResultPoller poller = getExpectedResultPoller(statement); final String key = getKey(statement); poll(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs, - testFile, actualPath, "validate", actualWarnCount, new IPollTask() { + testFile, actualPath, "validate", expectedWarnings, new IPollTask() { @Override public void execute(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, - MutableInt actualWarnCount) throws Exception { + BitSet expectedWarnings) throws Exception { File actualResultFile = new File(actualPath, testCaseCtx.getTestCase().getFilePath() + File.separatorChar + cUnit.getName() + '.' + ctx.getSeqNum() + ".polled.adm"); if (actualResultFile.exists() && !actualResultFile.delete()) { @@ -1461,7 +1464,7 @@ public class TestExecutor { private void poll(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, - String newType, MutableInt actualWarnCount, IPollTask pollTask) throws Exception { + String newType, BitSet expectedWarnings, IPollTask pollTask) throws Exception { // polltimeoutsecs=nnn, polldelaysecs=nnn int timeoutSecs = getTimeoutSecs(statement); int retryDelaySecs = getRetryDelaySecs(statement); @@ -1484,7 +1487,7 @@ public class TestExecutor { try { startSemaphore.release(); pollTask.execute(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, - queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount); + queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings); } finally { endSemaphore.release(); } @@ -1798,7 +1801,8 @@ public class TestExecutor { for (CompilationUnit cUnit : cUnits) { List<String> expectedErrors = cUnit.getExpectedError(); int expectedWarnCount = cUnit.getExpectedWarn().size(); - MutableInt actualWarnCount = new MutableInt(0); + BitSet expectedWarnings = new BitSet(cUnit.getExpectedWarn().size()); + expectedWarnings.set(0, cUnit.getExpectedWarn().size()); LOGGER.info( "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... "); Map<String, Object> variableCtx = new HashMap<>(); @@ -1818,7 +1822,7 @@ public class TestExecutor { try { if (!testFile.getName().startsWith(DIAGNOSE)) { executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, - queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount); + queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings); } } catch (TestLoop loop) { // rewind the iterator until we find our target @@ -1850,7 +1854,7 @@ public class TestExecutor { throw new Exception( "Test \"" + cUnit.getName() + "\" FAILED; expected exception was not thrown..."); } - ensureWarnings(actualWarnCount.getValue(), expectedWarnCount, cUnit); + ensureWarnings(expectedWarnings, cUnit); LOGGER.info( "[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " PASSED "); if (passedGroup != null) { @@ -1873,7 +1877,7 @@ public class TestExecutor { final File file = ctx.getFile(); final String statement = readTestFile(file); executeTestFile(testCaseCtx, ctx, variableCtx, statement, false, pb, cUnit, new MutableInt(-1), - Collections.emptyList(), file, null, new MutableInt(-1)); + Collections.emptyList(), file, null, new BitSet()); } } } catch (Exception diagnosticFailure) { @@ -2074,9 +2078,17 @@ public class TestExecutor { LOGGER.info("Cluster state now " + desiredState); } - protected void ensureWarnings(int actualWarnCount, int expectedWarnCount, CompilationUnit cUnit) throws Exception { - if (actualWarnCount < expectedWarnCount) { - LOGGER.error("Test {} failed to raise (an) expected warning(s)", cUnit.getName()); + protected void ensureWarnings(BitSet expectedWarnings, CompilationUnit cUnit) throws Exception { + boolean fail = !expectedWarnings.isEmpty(); + if (fail) { + LOGGER.error("Test {} failed to raise (an) expected warning(s):", cUnit.getName()); + } + List<String> expectedWarn = cUnit.getExpectedWarn(); + for (int i = expectedWarnings.nextSetBit(0); i >= 0; i = expectedWarnings.nextSetBit(i + 1)) { + String warning = expectedWarn.get(i); + LOGGER.error(warning); + } + if (fail) { throw new Exception("Test \"" + cUnit.getName() + "\" FAILED; expected warning(s) was not returned..."); } } @@ -2212,22 +2224,30 @@ public class TestExecutor { return extension.endsWith(AQL) ? getEndpoint(Servlets.QUERY_AQL) : getEndpoint(Servlets.QUERY_SERVICE); } - private void validateWarnings(List<String> actualWarnings, List<String> expectedWarn, MutableInt actualWarnCount, + private void validateWarnings(List<String> actualWarnings, List<String> expectedWarn, BitSet expectedWarnings, boolean expectedSourceLoc) throws Exception { if (actualWarnings != null) { for (String actualWarn : actualWarnings) { - if (expectedWarn.stream().noneMatch(actualWarn::contains)) { - throw new Exception("unexpected warning was encountered (" + actualWarn + ")"); + OptionalInt first = IntStream.range(0, expectedWarn.size()) + .filter(i -> actualWarn.contains(expectedWarn.get(i)) && expectedWarnings.get(i)).findFirst(); + if (!first.isPresent()) { + String msg = "unexpected warning was encountered or has already been matched (" + actualWarn + ")"; + LOGGER.error(msg); + if (!expectedWarnings.isEmpty()) { + LOGGER.error("was expecting the following warnings: "); + } + for (int i = expectedWarnings.nextSetBit(0); i >= 0; i = expectedWarnings.nextSetBit(i + 1)) { + LOGGER.error(expectedWarn.get(i)); + } + throw new Exception(msg); } if (expectedSourceLoc && !containsSourceLocation(actualWarn)) { throw new Exception(MessageFormat.format( "Expected to find source location \"{}, {}\" in warning text: +++++{}+++++", ERR_MSG_SRC_LOC_LINE_REGEX, ERR_MSG_SRC_LOC_COLUMN_REGEX, actualWarn)); } - actualWarnCount.increment(); - if (actualWarnCount.getValue() > expectedWarn.size()) { - throw new Exception("returned warnings exceeded expected warnings"); - } + int warningIndex = first.getAsInt(); + expectedWarnings.clear(warningIndex); } } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java index 37a3916..55c78e3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java @@ -28,6 +28,7 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.BitSet; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -385,11 +386,11 @@ public class AwsS3ExternalDatasetTest { public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, TestCase.CompilationUnit cUnit, MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, - MutableInt actualWarnCount) throws Exception { + BitSet expectedWarnings) throws Exception { String[] lines; switch (ctx.getType()) { case "s3bucket": - // <bucket_name> <def_name> <sub-path:src_file1,sub-path:src_file2,sub-path:src_file3> + // <bucket> <def> <sub-path:new_fname:src_file1,sub-path:new_fname:src_file2,sub-path:src_file3> lines = TestExecutor.stripAllComments(statement).trim().split("\n"); String lastLine = lines[lines.length - 1]; String[] command = lastLine.trim().split(" "); @@ -401,7 +402,7 @@ public class AwsS3ExternalDatasetTest { break; default: super.executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, - queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount); + queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings); } } } @@ -425,23 +426,37 @@ public class AwsS3ExternalDatasetTest { int size = s3pathAndSourceFile.length; String path; String sourceFilePath; - String sourceFileName; + String uploadedFileName; if (size == 1) { // case: playground json-data/reviews SOURCE_FILE1,SOURCE_FILE2 path = definitionPath; sourceFilePath = s3pathAndSourceFile[0]; - sourceFileName = FilenameUtils.getName(s3pathAndSourceFile[0]); - } else { + uploadedFileName = FilenameUtils.getName(s3pathAndSourceFile[0]); + } else if (size == 2) { // case: playground json-data/reviews level1/sub-level:SOURCE_FILE1,level2/sub-level:SOURCE_FILE2 + String subPathOrNewFileName = s3pathAndSourceFile[0]; + if (subPathOrNewFileName.startsWith("$$")) { + path = definitionPath; + sourceFilePath = s3pathAndSourceFile[1]; + uploadedFileName = subPathOrNewFileName.substring(2); + } else { + path = definitionPath + subPathOrNewFileName + (subPathOrNewFileName.endsWith("/") ? "" : "/"); + sourceFilePath = s3pathAndSourceFile[1]; + uploadedFileName = FilenameUtils.getName(s3pathAndSourceFile[1]); + } + } else if (size == 3) { path = definitionPath + s3pathAndSourceFile[0] + (s3pathAndSourceFile[0].endsWith("/") ? "" : "/"); - sourceFilePath = s3pathAndSourceFile[1]; - sourceFileName = FilenameUtils.getName(s3pathAndSourceFile[1]); + uploadedFileName = s3pathAndSourceFile[1]; + sourceFilePath = s3pathAndSourceFile[2]; + + } else { + throw new IllegalArgumentException(); } - String keyPath = path + sourceFileName; + String keyPath = path + uploadedFileName; int k = 1; while (fileNames.contains(keyPath)) { - keyPath = path + (k++) + sourceFileName; + keyPath = path + (k++) + uploadedFileName; } fileNames.add(keyPath); client.putObject(PUT_OBJECT_BUILDER.bucket(bucketName).key(keyPath).build(), diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp index f7c5c61..6df570c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp @@ -23,8 +23,10 @@ USE test; DROP TYPE t1 IF EXISTS; DROP TYPE t2 IF EXISTS; +DROP TYPE t3 IF EXISTS; CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string}; CREATE TYPE t2 AS {f1: bigint, f2: bigint?, f3: double, f4: double?, f5: string, f6: string?, f7: boolean, f8: boolean?}; +CREATE TYPE t3 AS {f1: bigint, f2: string, f3: string}; DROP DATASET ds1 IF EXISTS; CREATE EXTERNAL DATASET ds1(t1) USING S3 ( diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp similarity index 79% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp index 9d9ff28..5d3989b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp @@ -16,12 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; -import java.io.IOException; - -@FunctionalInterface -public interface IRecordConverter<I, O> { - - public O convert(IRawRecord<? extends I> input) throws IOException; -} +playground data_dir data/csv/error1_line_num.csv \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp similarity index 74% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp index f7c5c61..75ba5d6 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp @@ -17,17 +17,10 @@ * under the License. */ -DROP DATAVERSE test IF EXISTS; -CREATE DATAVERSE test; USE test; -DROP TYPE t1 IF EXISTS; -DROP TYPE t2 IF EXISTS; -CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string}; -CREATE TYPE t2 AS {f1: bigint, f2: bigint?, f3: double, f4: double?, f5: string, f6: string?, f7: boolean, f8: boolean?}; - -DROP DATASET ds1 IF EXISTS; -CREATE EXTERNAL DATASET ds1(t1) USING S3 ( +DROP DATASET ds2 IF EXISTS; +CREATE EXTERNAL DATASET ds2(t3) USING S3 ( ("accessKeyId"="dummyAccessKey"), ("secretAccessKey"="dummySecretKey"), ("region"="us-west-2"), diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp similarity index 79% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp index 9d9ff28..e6b24f3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp @@ -16,12 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +// requesttype=application/json +// param max-warnings:json=100 +USE test; -import java.io.IOException; - -@FunctionalInterface -public interface IRecordConverter<I, O> { - - public O convert(IRawRecord<? extends I> input) throws IOException; -} +FROM ds2 v SELECT VALUE v ORDER BY v.f1 ASC; \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp similarity index 79% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp index 9d9ff28..c35d646 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp @@ -16,12 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; -import java.io.IOException; - -@FunctionalInterface -public interface IRecordConverter<I, O> { - - public O convert(IRawRecord<? extends I> input) throws IOException; -} +playground data_dir data/csv/error2_line_num.csv \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp similarity index 79% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp index 9d9ff28..e6b24f3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp @@ -16,12 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +// requesttype=application/json +// param max-warnings:json=100 +USE test; -import java.io.IOException; - -@FunctionalInterface -public interface IRecordConverter<I, O> { - - public O convert(IRawRecord<? extends I> input) throws IOException; -} +FROM ds2 v SELECT VALUE v ORDER BY v.f1 ASC; \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp similarity index 79% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp index 9d9ff28..2bd413b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp @@ -16,12 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; -import java.io.IOException; - -@FunctionalInterface -public interface IRecordConverter<I, O> { - - public O convert(IRawRecord<? extends I> input) throws IOException; -} +playground data_dir $$1.json:data/json/malformed-json-no-closing.json,$$2.json:data/json/double-150-11.json \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp similarity index 82% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp index f7c5c61..7112bb9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp @@ -22,9 +22,7 @@ CREATE DATAVERSE test; USE test; DROP TYPE t1 IF EXISTS; -DROP TYPE t2 IF EXISTS; -CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string}; -CREATE TYPE t2 AS {f1: bigint, f2: bigint?, f3: double, f4: double?, f5: string, f6: string?, f7: boolean, f8: boolean?}; +CREATE TYPE t1 AS {}; DROP DATASET ds1 IF EXISTS; CREATE EXTERNAL DATASET ds1(t1) USING S3 ( @@ -34,6 +32,5 @@ CREATE EXTERNAL DATASET ds1(t1) USING S3 ( ("serviceEndpoint"="http://localhost:8001"), ("container"="playground"), ("definition"="data_dir"), -("format"="CSV"), -("header"="false") +("format"="JSON") ); \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp similarity index 79% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp index 9d9ff28..5f2ad26 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp @@ -16,12 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +// requesttype=application/json +// param max-warnings:json=100 +USE test; -import java.io.IOException; - -@FunctionalInterface -public interface IRecordConverter<I, O> { - - public O convert(IRawRecord<? extends I> input) throws IOException; -} +FROM ds1 v SELECT VALUE v; \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp similarity index 79% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp index 9d9ff28..36b2bab 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp @@ -16,12 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; -import java.io.IOException; - -@FunctionalInterface -public interface IRecordConverter<I, O> { - - public O convert(IRawRecord<? extends I> input) throws IOException; -} +DROP DATAVERSE test IF EXISTS; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml index 87bd204..e8902b6 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml @@ -223,7 +223,7 @@ <expected-warn>Duplicate field name "name" (in line 22, at column 30)</expected-warn> <expected-warn>Duplicate field name "id" (in line 22, at column 56)</expected-warn> <expected-warn>Duplicate field name "f1" (in line 22, at column 70)</expected-warn> - <expected-warn>Duplicate field name "id" (in line 22, at column 36)</expected-warn> + <expected-warn>Duplicate field name "id" (in line 22, at column 56)</expected-warn> <expected-warn>Duplicate field name "f1" (in line 22, at column 83)</expected-warn> <expected-warn>Duplicate field name "fname1" (in line 25, at column 45)</expected-warn> </compilation-unit> diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm new file mode 100644 index 0000000..c3ce0a1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm @@ -0,0 +1 @@ +{ "f1": 1, "f2": "good", "f3": "recommend" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm new file mode 100644 index 0000000..c56f0c5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm @@ -0,0 +1,2 @@ +{ "f1": 1, "f2": "good", "f3": "recommend" } +{ "f1": 3, "f2": "good", "f3": "recommend" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm new file mode 100644 index 0000000..5bae5d1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm @@ -0,0 +1 @@ +{ "double_value": 150.11 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml index 0597d8f..6704d78 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml @@ -42,35 +42,44 @@ <test-case FilePath="external-dataset" check-warnings="true"> <compilation-unit name="aws/s3/csv-warnings"> <output-dir compare="Text">aws/s3/csv-warnings</output-dir> - <expected-warn>Parsing error at data_dir/no_h_missing_fields.csv record 2 field 3: some fields are missing</expected-warn> - <expected-warn>Parsing error at data_dir/no_h_no_closing_q.csv record 0 field 0: malformed input record ended inside quote</expected-warn> - <expected-warn>Parsing error at record 0 field 0: malformed input record ended inside quote</expected-warn> + <expected-warn>Parsing error at data_dir/no_h_missing_fields.csv line 2 field 3: some fields are missing</expected-warn> + <expected-warn>Parsing error at data_dir/no_h_no_closing_q.csv line 2 field 0: malformed input record ended abruptly</expected-warn> + <expected-warn>Parsing error at line 2 field 0: malformed input record ended abruptly</expected-warn> - <expected-warn>Parsing error at record 4 field 3: invalid value</expected-warn> - <expected-warn>Parsing error at record 1 field 1: invalid value</expected-warn> - <expected-warn>Parsing error at record 10 field 1: invalid value</expected-warn> - <expected-warn>Parsing error at record 2 field 1: invalid value</expected-warn> - <expected-warn>Parsing error at record 3 field 1: invalid value</expected-warn> - <expected-warn>Parsing error at record 6 field 7: invalid value</expected-warn> - <expected-warn>Parsing error at record 12 field 7: invalid value</expected-warn> - <expected-warn>Parsing error at record 11 field 3: invalid value</expected-warn> - <expected-warn>Parsing error at record 8 field 6: a quote should be in the beginning</expected-warn> + <expected-warn>Parsing error at line 5 field 3: invalid value</expected-warn> + <expected-warn>Parsing error at line 2 field 1: invalid value</expected-warn> + <expected-warn>Parsing error at line 11 field 1: invalid value</expected-warn> + <expected-warn>Parsing error at line 3 field 1: invalid value</expected-warn> + <expected-warn>Parsing error at line 4 field 1: invalid value</expected-warn> + <expected-warn>Parsing error at line 7 field 7: invalid value</expected-warn> + <expected-warn>Parsing error at line 13 field 7: invalid value</expected-warn> + <expected-warn>Parsing error at line 12 field 3: invalid value</expected-warn> + <expected-warn>Parsing error at line 9 field 6: a quote should be in the beginning</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 4 field 3: invalid value</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 1 field 1: invalid value</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 10 field 1: invalid value</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 2 field 1: invalid value</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 3 field 1: invalid value</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 6 field 7: invalid value</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 12 field 7: invalid value</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 11 field 3: invalid value</expected-warn> - <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 8 field 6: a quote should be in the beginning</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 5 field 3: invalid value</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 2 field 1: invalid value</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 11 field 1: invalid value</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 3 field 1: invalid value</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 4 field 1: invalid value</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 7 field 7: invalid value</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 13 field 7: invalid value</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 12 field 3: invalid value</expected-warn> + <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 9 field 6: a quote should be in the beginning</expected-warn> + + <expected-warn>Parsing error at data_dir/error1_line_num.csv line 3 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn> + <expected-warn>Parsing error at data_dir/error2_line_num.csv line 4 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn> </compilation-unit> </test-case> <test-case FilePath="external-dataset" check-warnings="true"> <compilation-unit name="aws/s3/tsv-warnings"> <output-dir compare="Text">aws/s3/tsv-warnings</output-dir> - <expected-warn>Parsing error at data_dir/no_h_missing_fields.tsv record 2 field 3: some fields are missing</expected-warn> + <expected-warn>Parsing error at data_dir/no_h_missing_fields.tsv line 2 field 3: some fields are missing</expected-warn> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset" check-warnings="true"> + <compilation-unit name="aws/s3/json-warnings"> + <output-dir compare="Text">aws/s3/json-warnings</output-dir> + <expected-warn>Parsing error at data_dir/1.json line 3 field 0: malformed input record ended abruptly</expected-warn> </compilation-unit> </test-case> <test-case FilePath="external-dataset" check-warnings="true"> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java index 4dfcbb5..f959f8d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java @@ -51,4 +51,8 @@ public abstract class AsterixInputStream extends InputStream { public String getStreamName() { return ""; } + + public String getPreviousStreamName() { + return ""; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java index 9d9ff28..f544ca0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java @@ -19,9 +19,18 @@ package org.apache.asterix.external.api; import java.io.IOException; +import java.util.function.LongSupplier; @FunctionalInterface public interface IRecordConverter<I, O> { public O convert(IRawRecord<? extends I> input) throws IOException; + + /** + * Configures the converter with information suppliers from the {@link IRecordReader} data source. + * + * @param lineNumber line number supplier + */ + default void configure(LongSupplier lineNumber) { + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java index 9c9ec1c..c4dfdd0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.api; import java.io.DataOutput; +import java.util.function.LongSupplier; import java.util.function.Supplier; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -40,11 +41,11 @@ public interface IRecordDataParser<T> extends IDataParser { public boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException; /** - * Sets the data source name supplier that this parser is receiving records from. The data source name could be - * used for reporting, for example. + * Configures the parser with information suppliers from the {@link IRecordReader} data source. * * @param dataSourceName data source name supplier + * @param lineNumber line number supplier */ - default void setDataSourceName(Supplier<String> dataSourceName) { + default void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) { } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java index 95e83f2..cb97526 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java @@ -20,6 +20,7 @@ package org.apache.asterix.external.api; import java.io.Closeable; import java.io.IOException; +import java.util.function.LongSupplier; import java.util.function.Supplier; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; @@ -90,4 +91,8 @@ public interface IRecordReader<T> extends Closeable { default Supplier<String> getDataSourceName() { return ExternalDataConstants.EMPTY_STRING; } + + default LongSupplier getLineNumber() { + return ExternalDataConstants.NO_LINES; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java index 78240a0..8b930aa 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.input.record.converter; import java.io.IOException; +import java.util.function.LongSupplier; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; @@ -38,6 +39,7 @@ public class CSVToRecordWithMetadataAndPKConverter private final int valueIndex; private final RecordWithMetadataAndPK<char[]> recordWithMetadata; private final CharArrayRecord record; + private LongSupplier lineNumber = ExternalDataConstants.NO_LINES; public CSVToRecordWithMetadataAndPKConverter(final int valueIndex, final char delimiter, final ARecordType metaType, final ARecordType recordType, final int[] keyIndicator, final int[] keyIndexes, final IAType[] keyTypes, @@ -54,7 +56,7 @@ public class CSVToRecordWithMetadataAndPKConverter public RecordWithMetadataAndPK<char[]> convert(final IRawRecord<? extends char[]> input) throws IOException { record.reset(); recordWithMetadata.reset(); - cursor.nextRecord(input.get(), input.size()); + cursor.nextRecord(input.get(), input.size(), lineNumber.getAsLong()); int i = 0; int j = 0; FieldCursorForDelimitedDataParser.Result lastResult; @@ -77,4 +79,9 @@ public class CSVToRecordWithMetadataAndPKConverter } return recordWithMetadata; } + + @Override + public void configure(LongSupplier lineNumber) { + this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index a70848c..448d3f5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -88,12 +88,11 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE); } + // Current file ready, point to the next file + nextFileIndex++; if (notificationHandler != null) { notificationHandler.notifyNewSource(); } - - // Current file ready, point to the next file - nextFileIndex++; return true; } @@ -116,8 +115,16 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { @Override public String getStreamName() { - int currentFileIndex = nextFileIndex - 1; - return currentFileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(currentFileIndex); + return getStreamNameAt(nextFileIndex - 1); + } + + @Override + public String getPreviousStreamName() { + return getStreamNameAt(nextFileIndex - 2); + } + + private String getStreamNameAt(int fileIndex) { + return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex); } /** diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java index a3f560d..4b86142 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.LongSupplier; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; @@ -35,7 +36,8 @@ public class LineRecordReader extends StreamRecordReader { protected boolean hasHeader; protected boolean prevCharCR; protected int newlineLength; - protected int recordNumber = 0; + protected long beginLineNumber = 1; + protected long lineNumber = 1; protected boolean newSource = false; private static final List<String> recordReaderFormats = Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, @@ -60,7 +62,8 @@ public class LineRecordReader extends StreamRecordReader { public void resetForNewSource() { super.resetForNewSource(); newSource = true; - recordNumber = 0; + beginLineNumber = 1; + lineNumber = 1; prevCharCR = false; newlineLength = 0; } @@ -98,6 +101,7 @@ public class LineRecordReader extends StreamRecordReader { * consuming it until we have a chance to look at the char that * follows. */ + beginLineNumber = lineNumber; newlineLength = 0; //length of terminating newline prevCharCR = false; //true of prev char was CR record.reset(); @@ -120,9 +124,11 @@ public class LineRecordReader extends StreamRecordReader { if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) { newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte + ++lineNumber; break; } if (prevCharCR) { //CR + notLF, we are at notLF + ++lineNumber; newlineLength = 1; break; } @@ -140,8 +146,16 @@ public class LineRecordReader extends StreamRecordReader { newSource = false; continue; } - recordNumber++; return true; } } + + @Override + public LongSupplier getLineNumber() { + return this::getBeginLineNumber; + } + + private long getBeginLineNumber() { + return beginLineNumber; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java index 81b8e41..3a502d0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.external.input.record.reader.stream; -import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_IN_Q; +import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_AT_EOF; import java.io.IOException; import java.util.Arrays; @@ -60,7 +60,7 @@ public class QuotedLineRecordReader extends LineRecordReader { @Override public void notifyNewSource() { if (!record.isEmptyRecord() && warnings.shouldWarn()) { - ParseUtil.warn(warnings, getDataSourceName().get(), recordNumber, 0, REC_ENDED_IN_Q); + ParseUtil.warn(warnings, getPreviousStreamName(), lineNumber, 0, REC_ENDED_AT_EOF); } // restart for a new record from a new source resetForNewSource(); @@ -90,6 +90,7 @@ public class QuotedLineRecordReader extends LineRecordReader { if (done) { return false; } + beginLineNumber = lineNumber; newlineLength = 0; prevCharCR = false; prevCharEscape = false; @@ -106,7 +107,7 @@ public class QuotedLineRecordReader extends LineRecordReader { if (readLength <= 0 || inQuote) { // haven't read anything previously OR have read and in the middle and hit the end if (inQuote && warnings.shouldWarn()) { - ParseUtil.warn(warnings, getDataSourceName().get(), recordNumber, 0, REC_ENDED_IN_Q); + ParseUtil.warn(warnings, getDataSourceName().get(), lineNumber, 0, REC_ENDED_AT_EOF); } close(); return false; @@ -117,13 +118,18 @@ public class QuotedLineRecordReader extends LineRecordReader { } boolean maybeInQuote = false; for (; bufferPosn < bufferLength; ++bufferPosn) { - if (inputBuffer[bufferPosn] == quote && escape == quote) { + char ch = inputBuffer[bufferPosn]; + // count lines here since we need to also count the lines inside quotes + if (ch == ExternalDataConstants.LF || prevCharCR) { + lineNumber++; + } + if (ch == quote && escape == quote) { inQuote |= maybeInQuote; prevCharEscape |= maybeInQuote; } maybeInQuote = false; if (!inQuote) { - if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) { + if (ch == ExternalDataConstants.LF) { newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; break; @@ -132,20 +138,20 @@ public class QuotedLineRecordReader extends LineRecordReader { newlineLength = 1; break; } - prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR); // if this is an opening quote, mark it - inQuote = inputBuffer[bufferPosn] == quote && !prevCharEscape; + inQuote = ch == quote && !prevCharEscape; // the escape != quote is for making an opening quote not an escape - prevCharEscape = inputBuffer[bufferPosn] == escape && !prevCharEscape && escape != quote; + prevCharEscape = ch == escape && !prevCharEscape && escape != quote; } else { // if quote == escape and current char is quote, then it could be closing or escaping - if (inputBuffer[bufferPosn] == quote && !prevCharEscape) { + if (ch == quote && !prevCharEscape) { // this is most likely a closing quote. the outcome depends on the next char inQuote = false; maybeInQuote = true; } - prevCharEscape = inputBuffer[bufferPosn] == escape && !prevCharEscape && escape != quote; + prevCharEscape = ch == escape && !prevCharEscape && escape != quote; } + prevCharCR = (ch == ExternalDataConstants.CR); } readLength = bufferPosn - startPosn; if (readLength > 0) { @@ -159,7 +165,6 @@ public class QuotedLineRecordReader extends LineRecordReader { newSource = false; continue; } - recordNumber++; return true; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java index 1fb5b25..dfc60bc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java @@ -24,6 +24,7 @@ import static org.apache.asterix.external.util.ExternalDataConstants.CR; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.LF; import static org.apache.asterix.external.util.ExternalDataConstants.OPEN_BRACKET; +import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_AT_EOF; import static org.apache.asterix.external.util.ExternalDataConstants.SPACE; import static org.apache.asterix.external.util.ExternalDataConstants.TAB; @@ -32,14 +33,17 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.LongSupplier; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.ParseUtil; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; public class SemiStructuredRecordReader extends StreamRecordReader { @@ -50,6 +54,7 @@ public class SemiStructuredRecordReader extends StreamRecordReader { AFTER_COMMA // valid chars at this state: '{' to start a new nested record } + private IWarningCollector warnings; private int depth; private boolean prevCharEscape; private boolean inString; @@ -57,8 +62,10 @@ public class SemiStructuredRecordReader extends StreamRecordReader { private char recordEnd; private boolean hasStarted; private boolean hasFinished; - private int recordNumber = 0; + private boolean isLastCharCR; private State state = State.TOP_LEVEL; + private long beginLineNumber = 1; + private long lineNumber = 1; private static final List<String> recordReaderFormats = Collections.unmodifiableList( Arrays.asList(ExternalDataConstants.FORMAT_ADM, ExternalDataConstants.FORMAT_JSON_LOWER_CASE, @@ -70,6 +77,7 @@ public class SemiStructuredRecordReader extends StreamRecordReader { throws HyracksDataException { super.configure(stream, config); stream.setNotificationHandler(this); + warnings = ctx.getWarningCollector(); // set record opening char recordStart = ExternalDataUtils.validateGetRecordStart(config); // set record ending char @@ -81,16 +89,22 @@ public class SemiStructuredRecordReader extends StreamRecordReader { @Override public void notifyNewSource() { - if (hasStarted) { - // TODO(ali): WARN + if (hasStarted && warnings.shouldWarn()) { + ParseUtil.warn(warnings, getPreviousStreamName(), lineNumber, 0, REC_ENDED_AT_EOF); } - recordNumber = 0; + beginLineNumber = 1; + lineNumber = 1; state = State.TOP_LEVEL; resetForNewRecord(); } - public int getRecordNumber() { - return recordNumber; + @Override + public LongSupplier getLineNumber() { + return this::getBeginLineNumber; + } + + private long getBeginLineNumber() { + return beginLineNumber; } @Override @@ -99,12 +113,16 @@ public class SemiStructuredRecordReader extends StreamRecordReader { return false; } resetForNewRecord(); + beginLineNumber = lineNumber; do { int startPosn = bufferPosn; // starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; bufferLength = reader.read(inputBuffer); if (bufferLength < 0) { + if (hasStarted && warnings.shouldWarn()) { + ParseUtil.warn(warnings, getDataSourceName().get(), lineNumber, 0, REC_ENDED_AT_EOF); + } close(); return false; // EOF } @@ -112,6 +130,10 @@ public class SemiStructuredRecordReader extends StreamRecordReader { if (!hasStarted) { for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin char c = inputBuffer[bufferPosn]; + if (c == LF || isLastCharCR) { + lineNumber++; + } + isLastCharCR = c == CR; if (c == SPACE || c == TAB || c == LF || c == CR) { continue; } @@ -144,18 +166,22 @@ public class SemiStructuredRecordReader extends StreamRecordReader { } if (hasStarted) { for (; bufferPosn < bufferLength; ++bufferPosn) { + char c = inputBuffer[bufferPosn]; + if (c == LF || isLastCharCR) { + lineNumber++; + } if (inString) { // we are in a string, we only care about the string end - if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) { + if (c == ExternalDataConstants.QUOTE && !prevCharEscape) { inString = false; } - prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE && !prevCharEscape; + prevCharEscape = c == ExternalDataConstants.ESCAPE && !prevCharEscape; } else { - if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) { + if (c == ExternalDataConstants.QUOTE) { inString = true; - } else if (inputBuffer[bufferPosn] == recordStart) { + } else if (c == recordStart) { depth += 1; - } else if (inputBuffer[bufferPosn] == recordEnd) { + } else if (c == recordEnd) { depth -= 1; if (depth == 0) { hasFinished = true; @@ -164,6 +190,7 @@ public class SemiStructuredRecordReader extends StreamRecordReader { } } } + isLastCharCR = c == CR; } } @@ -179,7 +206,6 @@ public class SemiStructuredRecordReader extends StreamRecordReader { } } while (!hasFinished); record.endRecord(); - recordNumber++; return true; } @@ -198,6 +224,7 @@ public class SemiStructuredRecordReader extends StreamRecordReader { hasStarted = false; hasFinished = false; prevCharEscape = false; + isLastCharCR = false; inString = false; depth = 0; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java index 6139f82..cb16de5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java @@ -48,6 +48,7 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre protected boolean done = false; protected FeedLogManager feedLogManager; private Supplier<String> dataSourceName = EMPTY_STRING; + private Supplier<String> previousDataSourceName = EMPTY_STRING; public void configure(AsterixInputStream inputStream, Map<String, String> config) { this.reader = new AsterixInputStreamReader(inputStream); @@ -55,6 +56,7 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE]; if (!ExternalDataUtils.isTrue(config, KEY_REDACT_WARNINGS)) { this.dataSourceName = reader::getStreamName; + this.previousDataSourceName = reader::getPreviousStreamName; } } @@ -118,6 +120,10 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre return dataSourceName; } + String getPreviousStreamName() { + return previousDataSourceName.get(); + } + public abstract List<String> getRecordReaderFormats(); public abstract String getRequiredConfigs(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java index f5f68fe..4e963e4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java @@ -131,4 +131,8 @@ public class AsterixInputStreamReader extends Reader { public String getStreamName() { return in.getStreamName(); } + + public String getPreviousStreamName() { + return in.getPreviousStreamName(); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java index 4fec3c4..9e1b052 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java @@ -37,6 +37,7 @@ public class LocalFSInputStream extends AbstractMultipleInputStream { private static final Logger LOGGER = LogManager.getLogger(); private final FileSystemWatcher watcher; private File currentFile; + private String lastFileName = ""; public LocalFSInputStream(FileSystemWatcher watcher) { this.watcher = watcher; @@ -90,6 +91,10 @@ public class LocalFSInputStream extends AbstractMultipleInputStream { @Override protected boolean advance() throws IOException { + String tmpLastFileName = ""; + if (currentFile != null) { + tmpLastFileName = currentFile.getPath(); + } closeFile(); currentFile = watcher.poll(); if (currentFile == null) { @@ -100,6 +105,7 @@ public class LocalFSInputStream extends AbstractMultipleInputStream { } if (currentFile != null) { in = new FileInputStream(currentFile); + lastFileName = tmpLastFileName; if (notificationHandler != null) { notificationHandler.notifyNewSource(); } @@ -156,4 +162,9 @@ public class LocalFSInputStream extends AbstractMultipleInputStream { public String getStreamName() { return currentFile == null ? "" : currentFile.getPath(); } + + @Override + public String getPreviousStreamName() { + return lastFileName; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java index 8ac483e..60e6e77 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java @@ -26,6 +26,7 @@ import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.function.LongSupplier; import java.util.function.Supplier; import org.apache.asterix.builders.IARecordBuilder; @@ -65,6 +66,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa private final IValueParser[] valueParsers; private FieldCursorForDelimitedDataParser cursor; private Supplier<String> dataSourceName; + private LongSupplier lineNumber; private final byte[] fieldTypeTags; private final int[] fldIds; private final ArrayBackedValueStorage[] nameBuffers; @@ -74,6 +76,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser, String nullString) throws HyracksDataException { this.dataSourceName = ExternalDataConstants.EMPTY_STRING; + this.lineNumber = ExternalDataConstants.NO_LINES; this.warnings = ctx.getWarningCollector(); this.fieldDelimiter = fieldDelimiter; this.quote = quote; @@ -114,7 +117,8 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa } } if (!isStreamParser) { - cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote, warnings, dataSourceName); + cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote, warnings, + this::getDataSourceName); } this.nullChars = nullString != null ? nullString.toCharArray() : null; } @@ -122,7 +126,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa @Override public boolean parse(DataOutput out) throws HyracksDataException { try { - while (cursor.nextRecord()) { + if (cursor.nextRecord()) { if (parseRecord()) { recBuilder.write(out, true); return true; @@ -149,7 +153,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa break; case END: if (warnings.shouldWarn()) { - ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(), + ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(), cursor.getFieldCount(), MISSING_FIELDS); } return false; @@ -164,8 +168,10 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa fieldValueBufferOutput.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG); } else { if (cursor.isFieldEmpty() && !canProcessEmptyField(recordType.getFieldTypes()[i])) { - ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(), cursor.getFieldCount(), - EMPTY_FIELD); + if (warnings.shouldWarn()) { + ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(), + cursor.getFieldCount(), EMPTY_FIELD); + } return false; } fieldValueBufferOutput.writeByte(fieldTypeTags[i]); @@ -176,8 +182,10 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa boolean success = valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength(), fieldValueBufferOutput); if (!success) { - ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(), cursor.getFieldCount(), - INVALID_VAL); + if (warnings.shouldWarn()) { + ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(), + cursor.getFieldCount(), INVALID_VAL); + } return false; } } @@ -190,12 +198,19 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa throw HyracksDataException.create(e); } } - return true; + try { + while (cursor.nextField() == FieldCursorForDelimitedDataParser.Result.OK) { + // keep reading and discarding the extra fields + } + return true; + } catch (IOException e) { + throw HyracksDataException.create(e); + } } @Override public boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException { - cursor.nextRecord(record.get(), record.size()); + cursor.nextRecord(record.get(), record.size(), lineNumber.getAsLong()); if (parseRecord()) { recBuilder.write(out, true); return true; @@ -207,7 +222,7 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa public void setInputStream(InputStream in) throws IOException { // TODO(ali): revisit this in regards to stream cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote, warnings, - dataSourceName); + this::getDataSourceName); if (hasHeader) { cursor.nextRecord(); FieldCursorForDelimitedDataParser.Result result; @@ -224,13 +239,19 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa public boolean reset(InputStream in) throws IOException { // TODO(ali): revisit this in regards to stream cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote, warnings, - dataSourceName); + this::getDataSourceName); return true; } @Override - public void setDataSourceName(Supplier<String> dataSourceName) { + public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) { this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName; + this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber; + + } + + private String getDataSourceName() { + return dataSourceName.get(); } private static boolean canProcessEmptyField(IAType fieldType) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java index d799f22..820775c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java @@ -20,6 +20,8 @@ package org.apache.asterix.external.parser; import java.io.DataOutput; import java.io.IOException; +import java.util.function.LongSupplier; +import java.util.function.Supplier; import org.apache.asterix.builders.RecordBuilder; import org.apache.asterix.external.api.IDataParser; @@ -111,4 +113,10 @@ public class RecordWithMetadataParser<T, O> implements IRecordWithMetadataParser public void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws HyracksDataException { rwm.appendPrimaryKeyToTuple(tb); } + + @Override + public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) { + this.recordParser.configure(dataSourceName, lineNumber); + this.converter.configure(lineNumber); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java index 2644f3d..f60ecdc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java @@ -71,7 +71,7 @@ public class DataflowControllerProvider { IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory; IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx); // TODO(ali): revisit to think about passing data source name via setter or via createRecordParser - dataParser.setDataSourceName(recordReader.getDataSourceName()); + dataParser.configure(recordReader.getDataSourceName(), recordReader.getLineNumber()); if (indexingOp) { return new IndexingDataFlowController(ctx, dataParser, recordReader, ((IIndexingDatasource) recordReader).getIndexer()); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index b0acf44..63f57b6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -21,6 +21,7 @@ package org.apache.asterix.external.util; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.function.LongSupplier; import java.util.function.Supplier; public class ExternalDataConstants { @@ -253,6 +254,7 @@ public class ExternalDataConstants { public static final int MAX_RECORD_SIZE = 32000000; public static final Supplier<String> EMPTY_STRING = () -> ""; + public static final LongSupplier NO_LINES = () -> -1; /** * Expected parameter values @@ -270,7 +272,7 @@ public class ExternalDataConstants { public static final String ERROR_PARSE_RECORD = "Parser failed to parse record"; public static final String MISSING_FIELDS = "some fields are missing"; - public static final String REC_ENDED_IN_Q = "malformed input record ended inside quote"; + public static final String REC_ENDED_AT_EOF = "malformed input record ended abruptly"; public static final String EMPTY_FIELD = "empty value"; public static final String INVALID_VAL = "invalid value"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java index 129f28a..598d9ff 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java @@ -30,9 +30,9 @@ public class ParseUtil { private ParseUtil() { } - public static void warn(IWarningCollector warningCollector, String dataSourceName, int recordNum, int fieldNum, + public static void warn(IWarningCollector warningCollector, String dataSourceName, long lineNum, int fieldNum, String warnMessage) { warningCollector.warn( - Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName, recordNum, fieldNum, warnMessage)); + Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName, lineNum, fieldNum, warnMessage)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 4e3cf4e..ec536c0 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -140,7 +140,7 @@ 121 = A numeric type promotion error has occurred: %1$s 122 = Encountered an error while printing the plan 123 = Insufficient memory is provided for the join operators, please increase the join memory budget. -124 = Parsing error at %1$s record %2$s field %3$s: %4$s +124 = Parsing error at %1$s line %2$s field %3$s: %4$s 10000 = The given rule collection %1$s is not an instance of the List class. 10001 = Cannot compose partition constraint %1$s with %2$s diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java index ed2777b..48cfca6 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java @@ -54,7 +54,7 @@ public class FieldCursorForDelimitedDataParser { private char[] buffer; //buffer to holds the input coming form the underlying input stream private int fStart; //start position for field private int fEnd; //end position for field - private int recordCount; //count of records + private long lineCount; //count of lines private int fieldCount; //count of fields in current record private int doubleQuoteCount; //count of double quotes private boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes @@ -99,7 +99,7 @@ public class FieldCursorForDelimitedDataParser { doubleQuoteCount = 0; startedQuote = false; isDoubleQuoteIncludedInThisField = false; - recordCount = 0; + lineCount = 1; fieldCount = 0; } @@ -127,12 +127,12 @@ public class FieldCursorForDelimitedDataParser { return fieldCount; } - public int getRecordCount() { - return recordCount; + public long getLineCount() { + return lineCount; } - public void nextRecord(char[] buffer, int recordLength) { - recordCount++; + public void nextRecord(char[] buffer, int recordLength, long lineNumber) { + lineCount = lineNumber; fieldCount = 0; lastDelimiterPosition = -1; lastQuotePosition = -1; @@ -148,7 +148,6 @@ public class FieldCursorForDelimitedDataParser { } public boolean nextRecord() throws IOException { - recordCount++; fieldCount = 0; while (true) { switch (state) { @@ -164,6 +163,7 @@ public class FieldCursorForDelimitedDataParser { case IN_RECORD: int p = start; + char lastChar = '\0'; while (true) { if (p >= end) { int s = start; @@ -204,6 +204,11 @@ public class FieldCursorForDelimitedDataParser { lastDelimiterPosition = p; break; } + // count lines inside quotes + if (ch == '\r' || (ch == '\n' && lastChar != '\r')) { + lineCount++; + } + lastChar = ch; ++p; } break; @@ -217,6 +222,10 @@ public class FieldCursorForDelimitedDataParser { } } char ch = buffer[start]; + // if the next char "ch" is not \n, then count the \r + if (ch != '\n') { + lineCount++; + } if (ch == '\n' && !startedQuote) { ++start; state = State.EOR; @@ -226,6 +235,7 @@ public class FieldCursorForDelimitedDataParser { } case EOR: + lineCount++; if (start >= end) { eof = !readMore(); if (eof) { @@ -265,6 +275,7 @@ public class FieldCursorForDelimitedDataParser { quoteCount = 0; doubleQuoteCount = 0; + char lastChar = '\0'; int p = start; while (true) { if (p >= end) { @@ -380,6 +391,11 @@ public class FieldCursorForDelimitedDataParser { return Result.OK; } } + // count lines inside quotes + if (ch == '\r' || (ch == '\n' && lastChar != '\r')) { + lineCount++; + } + lastChar = ch; ++p; } } @@ -434,7 +450,7 @@ public class FieldCursorForDelimitedDataParser { } private void warn(String message) { - warnings.warn(Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName.get(), recordCount, - fieldCount, message)); + warnings.warn(Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName.get(), lineCount, fieldCount, + message)); } }
