This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 24ae9d4bca [ASTERIXDB-3367][EXT] Avro Parser for Union types for Bytes 
fix, Change in Map implementation.
24ae9d4bca is described below

commit 24ae9d4bca7a1af413484280e8f57aff4b17d7d9
Author: ayush-couchbase <[email protected]>
AuthorDate: Fri Mar 15 04:15:46 2024 +0530

    [ASTERIXDB-3367][EXT] Avro Parser for Union types for Bytes fix, Change in 
Map implementation.
    
    Change-Id: I584873a47bf409351d6b63979117616bce415c8f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
---
 .../avro/AvroFileExampleGeneratorUtil.java         | 10 +++++++--
 .../avro-types/avro-map/avro-map.03.query.sqlpp    |  2 +-
 .../avro/avro-types/avro-map/avro-map.02.adm       |  4 ++--
 .../avro/avro-types/avro-map/avro-map.03.adm       |  4 ++--
 .../avro/avro-types/avro-union/avro-union.02.adm   |  2 +-
 .../runtimets/testsuite_external_dataset_s3.xml    |  2 +-
 .../record/reader/stream/AvroRecordReader.java     | 16 +++++++++-----
 .../reader/stream/StreamRecordReaderFactory.java   |  6 ++----
 .../asterix/external/parser/AvroDataParser.java    | 25 ++++++++++++++--------
 9 files changed, 44 insertions(+), 27 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
index d62d2d107e..60f4b83b62 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
@@ -31,11 +31,12 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.Test;
 
 public class AvroFileExampleGeneratorUtil {
     private static final String SCHEMA_STRING = "{\n" + "  \"type\": 
\"record\",\n" + "  \"name\": \"SimpleRecord\",\n"
             + "  \"namespace\": \"com.example\",\n" + "  \"fields\": [\n" + "  
  {\n"
-            + "      \"name\": \"unionField\",\n" + "      \"type\": [\"int\", 
\"string\"],\n"
+            + "      \"name\": \"unionField\",\n" + "      \"type\": [\"int\", 
\"string\", \"bytes\"],\n"
             + "      \"doc\": \"This field can be either an int or a 
string.\"\n" + "    },\n" + "    {\n"
             + "      \"name\": \"mapField\",\n" + "      \"type\": {\n" + "    
    \"type\": \"map\",\n"
             + "        \"values\": \"int\",\n" + "        \"doc\": \"This is a 
map of string keys to int values.\"\n"
@@ -96,7 +97,7 @@ public class AvroFileExampleGeneratorUtil {
 
             //second record to be added
             GenericRecord record2 = new GenericData.Record(schema);
-            record2.put("unionField", "Example string");
+            record2.put("unionField", ByteBuffer.wrap(new byte[] { 0x01, 0x05 
}));
             Map<String, Integer> map2 = new HashMap<>();
             map2.put("key3", 3);
             map2.put("key4", 4);
@@ -115,4 +116,9 @@ public class AvroFileExampleGeneratorUtil {
             e.printStackTrace();
         }
     }
+
+    @Test
+    public void main() throws IOException {
+        writeExample();
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
index b97d9f85a7..c2ca9eefd0 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
@@ -24,6 +24,6 @@
 
 USE test;
 
-SELECT RAW a.mapField.key1
+SELECT RAW a.mapField[0]
 FROM AvroDataset a
 ORDER BY a.id;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
index 5560bf92f4..92d5ea1da8 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
@@ -1,2 +1,2 @@
-{ "key1": 1, "key2": 2 }
-{ "key3": 3, "key4": 4 }
\ No newline at end of file
+[ { "key": "key1", "value": 1 }, { "key": "key2", "value": 2 } ]
+[ { "key": "key3", "value": 3 }, { "key": "key4", "value": 4 } ]
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
index fe0b81feb1..73b283a1ee 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
@@ -1,2 +1,2 @@
-1
-null
\ No newline at end of file
+{ "key": "key1", "value": 1 }
+{ "key": "key3", "value": 3 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
index 15f87769f0..8fd021217b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
@@ -1,2 +1,2 @@
 42
-"Example string"
\ No newline at end of file
+hex("0105")
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 57474b3277..1e90f98bca 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -365,7 +365,7 @@
         <placeholder name="adapter" value="S3" />
         <output-dir compare="Text">none</output-dir>
         <source-location>false</source-location>
-        <expected-error>Not an Avro data file.</expected-error>
+        <expected-error>Malformed input stream</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
index e048890165..3f92f0051c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -35,6 +37,7 @@ import 
org.apache.asterix.external.input.stream.DiscretizedMultipleInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.avro.InvalidAvroMagicException;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -133,12 +136,15 @@ public class AvroRecordReader extends 
AbstractStreamRecordReader<GenericRecord>
     }
 
     private boolean advance() throws IOException {
-        if (inputStream.advance()) {
-            DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
-            dataFileStream = new DataFileStream<>(inputStream, datumReader);
-            return true;
+        try {
+            if (inputStream.advance()) {
+                DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+                dataFileStream = new DataFileStream<>(inputStream, 
datumReader);
+                return true;
+            }
+        } catch (InvalidAvroMagicException e) {
+            throw new 
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
         }
-
         return false;
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 985e2b5d29..1ebe982452 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -132,11 +132,9 @@ public class StreamRecordReaderFactory implements 
IRecordReaderFactory<Object> {
             streamRecordReader.configure(context.getTaskContext(), 
streamFactory.createInputStream(context),
                     configuration);
             return streamRecordReader;
-        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException
-                | NoSuchMethodException e) {
+        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException | NoSuchMethodException
+                | IOException e) {
             throw HyracksDataException.create(e);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
         }
     }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index d760c1f60f..6ee74d7b72 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -91,19 +91,26 @@ public class AvroDataParser extends AbstractDataParser 
implements IRecordDataPar
     }
 
     private void parseMap(Schema mapSchema, Map<String, ?> map, DataOutput 
out) throws IOException {
-        Schema valueSchema = mapSchema.getValueType();
-        final IMutableValueStorage valueBuffer = 
parserContext.enterCollection();
-        final IMutableValueStorage keyBuffer = parserContext.enterCollection();
+        final IMutableValueStorage item = parserContext.enterCollection();
+        final IMutableValueStorage valueBuffer = parserContext.enterObject();
         IARecordBuilder objectBuilder = 
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        IAsterixListBuilder listBuilder =
+                
parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
         for (Map.Entry<String, ?> entry : map.entrySet()) {
-            keyBuffer.reset();
+            objectBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
             valueBuffer.reset();
-            serializeString(entry.getKey(), Schema.Type.STRING, 
keyBuffer.getDataOutput());
-            parseValue(valueSchema, entry.getValue(), 
valueBuffer.getDataOutput());
-            objectBuilder.addField(keyBuffer, valueBuffer);
+            serializeString(entry.getKey(), Schema.Type.STRING, 
valueBuffer.getDataOutput());
+            
objectBuilder.addField(parserContext.getSerializedFieldName("key"), 
valueBuffer);
+            valueBuffer.reset();
+            parseValue(mapSchema.getValueType(), entry.getValue(), 
valueBuffer.getDataOutput());
+            
objectBuilder.addField(parserContext.getSerializedFieldName("value"), 
valueBuffer);
+            item.reset();
+            objectBuilder.write(item.getDataOutput(), true);
+            listBuilder.addItem(item);
         }
-        objectBuilder.write(out, true);
+        listBuilder.write(out, true);
         parserContext.exitObject(valueBuffer, null, objectBuilder);
+        parserContext.exitCollection(item, listBuilder);
     }
 
     private final void parseUnion(Schema unionSchema, Object value, DataOutput 
out) throws IOException {
@@ -134,7 +141,7 @@ public class AvroDataParser extends AbstractDataParser 
implements IRecordDataPar
             case BOOLEAN:
                 return value instanceof Boolean;
             case BYTES:
-                return value instanceof Byte;
+                return value instanceof ByteBuffer;
             case RECORD:
                 return value instanceof GenericData.Record;
             default:

Reply via email to