This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 3355-add-new-avro-format-parser
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3355-add-new-avro-format-parser by this push:
     new 006c8ebc4e Add avro parser
006c8ebc4e is described below

commit 006c8ebc4e456c97ef571abfeeace4ea3a59f387
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Nov 26 15:38:29 2024 +0100

    Add avro parser
---
 streampipes-extensions-management/pom.xml          |   5 +
 .../connect/adapter/BrokerEventProcessor.java      |   5 +-
 .../connect/adapter/parser/AvroParser.java         | 213 +++++++++++++++++++++
 .../management/connect/adapter/parser/Parsers.java |   3 +-
 .../streampipes-extensions-all-jvm/pom.xml         |   5 +
 5 files changed, 227 insertions(+), 4 deletions(-)

diff --git a/streampipes-extensions-management/pom.xml 
b/streampipes-extensions-management/pom.xml
index 3c0655c595..9c0427023b 100644
--- a/streampipes-extensions-management/pom.xml
+++ b/streampipes-extensions-management/pom.xml
@@ -94,6 +94,11 @@
             <groupId>com.opencsv</groupId>
             <artifactId>opencsv</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.11.4</version>
+        </dependency>
 
         <!-- Test dependencies -->
         <dependency>
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
index 2ad113e8a7..b3ed8c6fd1 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
@@ -23,11 +23,10 @@ import 
org.apache.streampipes.extensions.api.connect.IEventCollector;
 import org.apache.streampipes.extensions.api.connect.IParser;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 
-import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
+import java.io.ByteArrayInputStream;
 
 public record BrokerEventProcessor(
     IParser parser,
@@ -39,7 +38,7 @@ public record BrokerEventProcessor(
   @Override
   public void onEvent(byte[] payload) {
     try {
-      parser.parse(IOUtils.toInputStream(new String(payload), 
StandardCharsets.UTF_8), collector::collect);
+      parser.parse(new ByteArrayInputStream(payload), collector::collect);
     } catch (ParseException e) {
       LOG.error("Error while parsing: " + e.getMessage());
     }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
new file mode 100644
index 0000000000..c1c8b8f57c
--- /dev/null
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management.connect.adapter.parser;
+
+import org.apache.streampipes.commons.exceptions.connect.ParseException;
+import org.apache.streampipes.extensions.api.connect.IParser;
+import org.apache.streampipes.extensions.api.connect.IParserEventHandler;
+import org.apache.streampipes.model.connect.grounding.ParserDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.sdk.builder.adapter.ParserDescriptionBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Options;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AvroParser implements IParser {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AvroParser.class);
+
+  public static final String ID = 
"org.apache.streampipes.extensions.management.connect.adapter.parser.avro";
+  public static final String LABEL = "Avro";
+  public static final String DESCRIPTION = "Can be used to read avro records";
+
+  public static final String SCHEMA = "schema";
+  public static final String SCHEMA_REGISTRY = "schemaRegistry";
+  public static final String FLATTEN_RECORDS = "flattenRecord";
+
+  private final ParserUtils parserUtils;
+  private DatumReader<GenericRecord> datumReader;
+  private boolean schemaRegistry;
+  private boolean flattenRecord;
+
+
+  public AvroParser() {
+    parserUtils = new ParserUtils();
+  }
+
+  public AvroParser(String schemaString, boolean schemaRegistry, boolean 
flattenRecord) {
+    this();
+    Schema schema = new Schema.Parser().parse(schemaString);
+    this.datumReader = new GenericDatumReader<>(schema);
+    this.schemaRegistry = schemaRegistry;
+    this.flattenRecord = flattenRecord;
+  }
+
+  @Override
+  public ParserDescription declareDescription() {
+    return ParserDescriptionBuilder.create(ID, LABEL, DESCRIPTION)
+      .requiredSingleValueSelection(
+          Labels.from(SCHEMA_REGISTRY, "Schema Registry",
+                  "Does the messages include the schema registry header?"),
+          Options.from("yes", "no")
+      )
+      .requiredSingleValueSelection(
+          Labels.from(FLATTEN_RECORDS, "Flatten Records",
+                  "Should nested records be flattened?"),
+          Options.from("no", "yes")
+      )
+      .requiredCodeblock(Labels.from(SCHEMA, "Schema",
+  "The schema of the avro record"),
+  "{\n"
+          + " \"namespace\": \"example.avro\",\n"
+          + " \"type\": \"record\",\n"
+          + " \"name\": \"Test\",\n"
+          + " \"fields\": [\n"
+          + "     {\"name\": \"id\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"value\", \"type\": \"double\"}\n"
+          + " ]\n"
+          + "}")
+      .build();
+  }
+
+
+  @Override
+  public GuessSchema getGuessSchema(InputStream inputStream) throws 
ParseException {
+    GenericRecord avroRecord = getRecord(inputStream);
+    var event = toMap(avroRecord);
+    return parserUtils.getGuessSchema(event);
+  }
+
+  @Override
+  public void parse(InputStream inputStream, IParserEventHandler handler) 
throws ParseException {
+    GenericRecord avroRecord = getRecord(inputStream);
+    var event = toMap(avroRecord);
+    handler.handle(event);
+  }
+
+  @Override
+  public IParser fromDescription(List<StaticProperty> configuration) {
+    var extractor = StaticPropertyExtractor.from(configuration);
+    String schema = extractor.codeblockValue(SCHEMA);
+    boolean schemaRegistry = extractor
+        .selectedSingleValue(SCHEMA_REGISTRY, String.class)
+        .equals("yes");
+    boolean flattenRecords = extractor
+        .selectedSingleValue(FLATTEN_RECORDS, String.class)
+        .equals("yes");
+
+    return new AvroParser(schema, schemaRegistry, flattenRecords);
+  }
+
+  private GenericRecord getRecord(InputStream inputStream) throws 
ParseException {
+    try {
+      if (schemaRegistry) {
+        inputStream.skipNBytes(5);
+      }
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+      GenericRecord avroRecord = datumReader.read(null, decoder);
+      LOG.info("Read record: {}", avroRecord);
+      return  avroRecord;
+    } catch (IOException e) {
+      throw new ParseException(
+          "Error processing Kafka message: " + e.getMessage()
+      );
+    }
+  }
+
+
+  private Map<String, Object> toMap(GenericRecord avroRecord) {
+    Map<String, Object> resultMap = new LinkedHashMap<>();
+    avroRecord.getSchema().getFields().forEach(field -> {
+      String fieldName = field.name();
+      Object fieldValue = avroRecord.get(fieldName);
+      if (flattenRecord && fieldValue instanceof GenericRecord){
+        Map<String, Object> flatMap = unwrapNestedRecord((GenericRecord) 
fieldValue, fieldName);
+        resultMap.putAll(flatMap);
+      } else {
+        resultMap.put(fieldName, toMapHelper(fieldValue));
+      }
+    });
+
+    return resultMap;
+  }
+
+  private Object toMapHelper(Object fieldValue) {
+    if (fieldValue instanceof GenericRecord){
+      return toMap((GenericRecord) fieldValue);
+    }
+    if (fieldValue instanceof GenericData.Array<?>){
+      List<Object> valueList = new ArrayList<>();
+      ((GenericData.Array) fieldValue).forEach(value -> 
valueList.add(toMapHelper(value)));
+      return valueList;
+    }
+    if (fieldValue instanceof Map<?, ?>){
+      Map<Object, Object> valueMap = new LinkedHashMap<>();
+      ((Map<Object, Object>) fieldValue).entrySet().forEach(
+              value -> valueMap.put(convertUTF8(value.getKey()), 
toMapHelper(value.getValue())));
+      return valueMap;
+    }
+    return convertUTF8(fieldValue);
+  }
+
+
+  private Map<String, Object> unwrapNestedRecord(GenericRecord nestedRecord, 
String prefix) {
+    Map<String, Object> flatMap = new HashMap<>();
+
+    nestedRecord.getSchema().getFields().forEach(field -> {
+      String fieldName = field.name();
+      Object fieldValue = nestedRecord.get(fieldName);
+      String newKey = prefix.isEmpty() ? fieldName : prefix + "_" + fieldName;
+      if (fieldValue instanceof GenericRecord) {
+        flatMap.putAll(unwrapNestedRecord((GenericRecord) fieldValue, newKey));
+      } else {
+        flatMap.put(newKey, toMapHelper(fieldValue));
+      }
+    });
+
+    return flatMap;
+  }
+
+  private Object convertUTF8(Object fieldValue) {
+    if (fieldValue instanceof Utf8){
+      return fieldValue.toString();
+    }
+    return fieldValue;
+  }
+
+}
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
index 3c8a0110da..1dc2937792 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
@@ -30,7 +30,8 @@ public class Parsers {
         new JsonParsers(),
         new CsvParser(),
         new XmlParser(),
-        new ImageParser()
+        new ImageParser(),
+        new AvroParser()
     );
   }
 }
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml 
b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
index 88cede6913..3b9e002677 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
@@ -64,6 +64,11 @@
             <artifactId>streampipes-connectors-kafka</artifactId>
             <version>0.97.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.11.4</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-connectors-mqtt</artifactId>

Reply via email to