This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba446b9a0c Handle Exceptions while reading Avro data format
ba446b9a0c is described below

commit ba446b9a0cfd1136a33905ec4216a026a2336372
Author: Savyasach Reddy <[email protected]>
AuthorDate: Wed Jun 12 12:23:30 2024 +0530

    Handle Exceptions while reading Avro data format
    
    Change-Id: I84405ce8142a887c341e2f8d82a68d58e287dd93
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18366
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
---
 .../runtimets/testsuite_external_dataset_s3.xml    |  2 +-
 .../record/reader/stream/AvroRecordReader.java     | 43 +++++++++++++---------
 .../asterix/external/parser/AvroDataParser.java    | 10 +++--
 3 files changed, 32 insertions(+), 23 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 54c32ef0c7..5df65bac61 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -371,7 +371,7 @@
       <compilation-unit name="common/avro/invalid-avro-files">
         <placeholder name="adapter" value="S3" />
         <output-dir compare="Text">none</output-dir>
-        <expected-error>Malformed input stream</expected-error>
+        <expected-error>External source error. 
org.apache.avro.InvalidAvroMagicException: Not an Avro data 
file.</expected-error>
         <source-location>false</source-location>
       </compilation-unit>
     </test-case>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
index 3f92f0051c..1d76a9001f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
@@ -20,6 +20,7 @@ package 
org.apache.asterix.external.input.record.reader.stream;
 
 import static 
org.apache.asterix.external.util.ExternalDataConstants.EMPTY_STRING;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_REDACT_WARNINGS;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -37,7 +38,7 @@ import 
org.apache.asterix.external.input.stream.DiscretizedMultipleInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.IFeedLogManager;
-import org.apache.avro.InvalidAvroMagicException;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -55,7 +56,7 @@ public class AvroRecordReader extends 
AbstractStreamRecordReader<GenericRecord>
     private static final List<String> recordReaderFormats =
             
Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_AVRO));
 
-    public AvroRecordReader(AsterixInputStream inputStream, Map<String, 
String> config) throws IOException {
+    public AvroRecordReader(AsterixInputStream inputStream, Map<String, 
String> config) throws HyracksDataException {
         record = new 
org.apache.asterix.external.input.record.GenericRecord<>();
         this.inputStream = new DiscretizedMultipleInputStream(inputStream);
         done = false;
@@ -91,21 +92,29 @@ public class AvroRecordReader extends 
AbstractStreamRecordReader<GenericRecord>
     }
 
     @Override
-    public IRawRecord<GenericRecord> next() throws IOException {
-        avroRecord = dataFileStream.next(avroRecord);
-        record.set(avroRecord);
-        return record;
+    public IRawRecord<GenericRecord> next() throws HyracksDataException {
+        try {
+            avroRecord = dataFileStream.next(avroRecord);
+            record.set(avroRecord);
+            return record;
+        } catch (AvroRuntimeException | IOException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
+        }
     }
 
     @Override
-    public boolean hasNext() throws IOException {
-        if (dataFileStream == null) {
-            return false;
-        }
-        if (dataFileStream.hasNext()) {
-            return true;
+    public boolean hasNext() throws HyracksDataException {
+        try {
+            if (dataFileStream == null) {
+                return false;
+            }
+            if (dataFileStream.hasNext()) {
+                return true;
+            }
+            return advance() && dataFileStream.hasNext();
+        } catch (AvroRuntimeException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
         }
-        return advance() && dataFileStream.hasNext();
     }
 
     @Override
@@ -126,7 +135,6 @@ public class AvroRecordReader extends 
AbstractStreamRecordReader<GenericRecord>
     @Override
     public List<String> getRecordReaderFormats() {
         return recordReaderFormats;
-
     }
 
     @Override
@@ -135,17 +143,16 @@ public class AvroRecordReader extends 
AbstractStreamRecordReader<GenericRecord>
 
     }
 
-    private boolean advance() throws IOException {
+    private boolean advance() throws HyracksDataException {
         try {
             if (inputStream.advance()) {
                 DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
                 dataFileStream = new DataFileStream<>(inputStream, 
datumReader);
                 return true;
             }
-        } catch (InvalidAvroMagicException e) {
-            throw new 
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
+        } catch (AvroRuntimeException | IOException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
         }
         return false;
     }
-
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index 49b9e7451a..8088daa307 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.parser;
 
 import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -40,6 +41,7 @@ import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -62,8 +64,8 @@ public class AvroDataParser extends AbstractDataParser 
implements IRecordDataPar
             parseObject(record.get(), out);
             valueEmbedder.reset();
             return true;
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
+        } catch (AvroRuntimeException | IOException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
         }
     }
 
@@ -278,12 +280,12 @@ public class AvroDataParser extends AbstractDataParser 
implements IRecordDataPar
         doubleSerde.serialize(aDouble, out);
     }
 
-    private void serializeString(Object value, DataOutput out) throws 
IOException {
+    private void serializeString(Object value, DataOutput out) throws 
HyracksDataException {
         aString.setValue(value.toString());
         stringSerde.serialize(aString, out);
     }
 
-    private static HyracksDataException createUnsupportedException(Schema 
schema) throws HyracksDataException {
+    private static HyracksDataException createUnsupportedException(Schema 
schema) {
         return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Avro 
Parser", schema);
     }
 }

Reply via email to