This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 8716990ee42ada6a9f67303ada94c8c0e89f5f0f Author: Ali Alsuliman <[email protected]> AuthorDate: Wed Apr 29 22:05:05 2020 -0700 [NO ISSUE][EXT] Fix error reporting when processing external datasets records - user model changes: no - storage format changes: no - interface changes: no Details: Fix error reporting when the record size retrieved from external datasets exceeds the limit instead of reporting internal error. Change-Id: I0f0973356c727d7d7a28252163aba2591c4205db Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6083 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- .../external/input/record/CharArrayRecord.java | 4 +-- .../reader/stream/SemiStructuredRecordReader.java | 4 +-- .../external/input/stream/LocalFSInputStream.java | 38 ++++++++++++++-------- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java index aa4abb4..5d7ffb2 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java @@ -65,7 +65,7 @@ public class CharArrayRecord implements IRawRecord<char[]> { size = length; } - private void ensureCapacity(int len) throws IOException { + private void ensureCapacity(int len) throws RuntimeDataException { if (value.length < len) { if (len > ExternalDataConstants.MAX_RECORD_SIZE) { throw new RuntimeDataException(ErrorCode.INPUT_RECORD_READER_CHAR_ARRAY_RECORD_TOO_LARGE, @@ -77,7 +77,7 @@ public class CharArrayRecord implements IRawRecord<char[]> { } } - public void append(char[] recordBuffer, int offset, int length) throws IOException { + public void append(char[] recordBuffer, int offset, int length) throws RuntimeDataException { ensureCapacity(size + length); System.arraycopy(recordBuffer, offset, value, size, length); size += length; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java index 38eec98..fa4a4a5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java @@ -152,10 +152,10 @@ public class SemiStructuredRecordReader extends StreamRecordReader { if (appendLength > 0) { try { record.append(inputBuffer, startPosn, appendLength); - } catch (IOException e) { + } catch (RuntimeDataException e) { reader.reset(); bufferPosn = bufferLength = 0; - throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM); + throw e; } } } while (!hasFinished); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java index 3d23b24..4fec3c4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java @@ -121,21 +121,31 @@ public class LocalFSInputStream extends AbstractMultipleInputStream { return false; } Throwable root = ExceptionUtils.getRootCause(th); - if (root instanceof HyracksDataException - && ((HyracksDataException) root).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) { - if (currentFile != null) { - try { - logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file"); - } catch (IOException e) { - LOGGER.log(Level.WARN, "Filed to write to feed log file", e); + if (root instanceof HyracksDataException) { + HyracksDataException r = (HyracksDataException) root; + String component = r.getComponent(); + if (ErrorCode.ASTERIX.equals(component)) { + int errorCode = r.getErrorCode(); + switch (errorCode) { + case ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM: + if (currentFile != null) { + try { + logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file"); + } catch (IOException e) { + LOGGER.log(Level.WARN, "Filed to write to feed log file", e); + } + LOGGER.log(Level.WARN, "Corrupted input file: " + currentFile.getAbsolutePath()); + } + case ErrorCode.INPUT_RECORD_READER_CHAR_ARRAY_RECORD_TOO_LARGE: + try { + advance(); + return true; + } catch (Exception e) { + LOGGER.log(Level.WARN, "An exception was thrown while trying to skip a file", e); + } + default: + break; } - LOGGER.log(Level.WARN, "Corrupted input file: " + currentFile.getAbsolutePath()); - } - try { - advance(); - return true; - } catch (Exception e) { - LOGGER.log(Level.WARN, "An exception was thrown while trying to skip a file", e); } } LOGGER.log(Level.WARN, "Failed to recover from failure", th);
