This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch 3355-add-new-avro-format-parser
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3355-add-new-avro-format-parser by this push:
new 006c8ebc4e Add avro parser
006c8ebc4e is described below
commit 006c8ebc4e456c97ef571abfeeace4ea3a59f387
Author: Sven Oehler <[email protected]>
AuthorDate: Tue Nov 26 15:38:29 2024 +0100
Add avro parser
---
streampipes-extensions-management/pom.xml | 5 +
.../connect/adapter/BrokerEventProcessor.java | 5 +-
.../connect/adapter/parser/AvroParser.java | 213 +++++++++++++++++++++
.../management/connect/adapter/parser/Parsers.java | 3 +-
.../streampipes-extensions-all-jvm/pom.xml | 5 +
5 files changed, 227 insertions(+), 4 deletions(-)
diff --git a/streampipes-extensions-management/pom.xml
b/streampipes-extensions-management/pom.xml
index 3c0655c595..9c0427023b 100644
--- a/streampipes-extensions-management/pom.xml
+++ b/streampipes-extensions-management/pom.xml
@@ -94,6 +94,11 @@
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.4</version>
+ </dependency>
<!-- Test dependencies -->
<dependency>
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
index 2ad113e8a7..b3ed8c6fd1 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/BrokerEventProcessor.java
@@ -23,11 +23,10 @@ import
org.apache.streampipes.extensions.api.connect.IEventCollector;
import org.apache.streampipes.extensions.api.connect.IParser;
import org.apache.streampipes.messaging.InternalEventProcessor;
-import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
+import java.io.ByteArrayInputStream;
public record BrokerEventProcessor(
IParser parser,
@@ -39,7 +38,7 @@ public record BrokerEventProcessor(
@Override
public void onEvent(byte[] payload) {
try {
- parser.parse(IOUtils.toInputStream(new String(payload),
StandardCharsets.UTF_8), collector::collect);
+ parser.parse(new ByteArrayInputStream(payload), collector::collect);
} catch (ParseException e) {
LOG.error("Error while parsing: " + e.getMessage());
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
new file mode 100644
index 0000000000..c1c8b8f57c
--- /dev/null
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/AvroParser.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.management.connect.adapter.parser;
+
+import org.apache.streampipes.commons.exceptions.connect.ParseException;
+import org.apache.streampipes.extensions.api.connect.IParser;
+import org.apache.streampipes.extensions.api.connect.IParserEventHandler;
+import org.apache.streampipes.model.connect.grounding.ParserDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.sdk.builder.adapter.ParserDescriptionBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Options;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AvroParser implements IParser {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroParser.class);
+
+ public static final String ID =
"org.apache.streampipes.extensions.management.connect.adapter.parser.avro";
+ public static final String LABEL = "Avro";
+ public static final String DESCRIPTION = "Can be used to read avro records";
+
+ public static final String SCHEMA = "schema";
+ public static final String SCHEMA_REGISTRY = "schemaRegistry";
+ public static final String FLATTEN_RECORDS = "flattenRecord";
+
+ private final ParserUtils parserUtils;
+ private DatumReader<GenericRecord> datumReader;
+ private boolean schemaRegistry;
+ private boolean flattenRecord;
+
+
+ public AvroParser() {
+ parserUtils = new ParserUtils();
+ }
+
+ public AvroParser(String schemaString, boolean schemaRegistry, boolean
flattenRecord) {
+ this();
+ Schema schema = new Schema.Parser().parse(schemaString);
+ this.datumReader = new GenericDatumReader<>(schema);
+ this.schemaRegistry = schemaRegistry;
+ this.flattenRecord = flattenRecord;
+ }
+
+ @Override
+ public ParserDescription declareDescription() {
+ return ParserDescriptionBuilder.create(ID, LABEL, DESCRIPTION)
+ .requiredSingleValueSelection(
+ Labels.from(SCHEMA_REGISTRY, "Schema Registry",
+ "Does the messages include the schema registry header?"),
+ Options.from("yes", "no")
+ )
+ .requiredSingleValueSelection(
+ Labels.from(FLATTEN_RECORDS, "Flatten Records",
+ "Should nested records be flattened?"),
+ Options.from("no", "yes")
+ )
+ .requiredCodeblock(Labels.from(SCHEMA, "Schema",
+ "The schema of the avro record"),
+ "{\n"
+ + " \"namespace\": \"example.avro\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Test\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"id\", \"type\": \"string\"},\n"
+ + " {\"name\": \"value\", \"type\": \"double\"}\n"
+ + " ]\n"
+ + "}")
+ .build();
+ }
+
+
+ @Override
+ public GuessSchema getGuessSchema(InputStream inputStream) throws
ParseException {
+ GenericRecord avroRecord = getRecord(inputStream);
+ var event = toMap(avroRecord);
+ return parserUtils.getGuessSchema(event);
+ }
+
+ @Override
+ public void parse(InputStream inputStream, IParserEventHandler handler)
throws ParseException {
+ GenericRecord avroRecord = getRecord(inputStream);
+ var event = toMap(avroRecord);
+ handler.handle(event);
+ }
+
+ @Override
+ public IParser fromDescription(List<StaticProperty> configuration) {
+ var extractor = StaticPropertyExtractor.from(configuration);
+ String schema = extractor.codeblockValue(SCHEMA);
+ boolean schemaRegistry = extractor
+ .selectedSingleValue(SCHEMA_REGISTRY, String.class)
+ .equals("yes");
+ boolean flattenRecords = extractor
+ .selectedSingleValue(FLATTEN_RECORDS, String.class)
+ .equals("yes");
+
+ return new AvroParser(schema, schemaRegistry, flattenRecords);
+ }
+
+ private GenericRecord getRecord(InputStream inputStream) throws
ParseException {
+ try {
+ if (schemaRegistry) {
+ inputStream.skipNBytes(5);
+ }
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream,
null);
+ GenericRecord avroRecord = datumReader.read(null, decoder);
+ LOG.info("Read record: {}", avroRecord);
+ return avroRecord;
+ } catch (IOException e) {
+ throw new ParseException(
+ "Error processing Kafka message: " + e.getMessage()
+ );
+ }
+ }
+
+
+ private Map<String, Object> toMap(GenericRecord avroRecord) {
+ Map<String, Object> resultMap = new LinkedHashMap<>();
+ avroRecord.getSchema().getFields().forEach(field -> {
+ String fieldName = field.name();
+ Object fieldValue = avroRecord.get(fieldName);
+ if (flattenRecord && fieldValue instanceof GenericRecord){
+ Map<String, Object> flatMap = unwrapNestedRecord((GenericRecord)
fieldValue, fieldName);
+ resultMap.putAll(flatMap);
+ } else {
+ resultMap.put(fieldName, toMapHelper(fieldValue));
+ }
+ });
+
+ return resultMap;
+ }
+
+ private Object toMapHelper(Object fieldValue) {
+ if (fieldValue instanceof GenericRecord){
+ return toMap((GenericRecord) fieldValue);
+ }
+ if (fieldValue instanceof GenericData.Array<?>){
+ List<Object> valueList = new ArrayList<>();
+ ((GenericData.Array) fieldValue).forEach(value ->
valueList.add(toMapHelper(value)));
+ return valueList;
+ }
+ if (fieldValue instanceof Map<?, ?>){
+ Map<Object, Object> valueMap = new LinkedHashMap<>();
+ ((Map<Object, Object>) fieldValue).entrySet().forEach(
+ value -> valueMap.put(convertUTF8(value.getKey()),
toMapHelper(value.getValue())));
+ return valueMap;
+ }
+ return convertUTF8(fieldValue);
+ }
+
+
+ private Map<String, Object> unwrapNestedRecord(GenericRecord nestedRecord,
String prefix) {
+ Map<String, Object> flatMap = new HashMap<>();
+
+ nestedRecord.getSchema().getFields().forEach(field -> {
+ String fieldName = field.name();
+ Object fieldValue = nestedRecord.get(fieldName);
+ String newKey = prefix.isEmpty() ? fieldName : prefix + "_" + fieldName;
+ if (fieldValue instanceof GenericRecord) {
+ flatMap.putAll(unwrapNestedRecord((GenericRecord) fieldValue, newKey));
+ } else {
+ flatMap.put(newKey, toMapHelper(fieldValue));
+ }
+ });
+
+ return flatMap;
+ }
+
+ private Object convertUTF8(Object fieldValue) {
+ if (fieldValue instanceof Utf8){
+ return fieldValue.toString();
+ }
+ return fieldValue;
+ }
+
+}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
index 3c8a0110da..1dc2937792 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/Parsers.java
@@ -30,7 +30,8 @@ public class Parsers {
new JsonParsers(),
new CsvParser(),
new XmlParser(),
- new ImageParser()
+ new ImageParser(),
+ new AvroParser()
);
}
}
diff --git a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
index 88cede6913..3b9e002677 100644
--- a/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-extensions-all-jvm/pom.xml
@@ -64,6 +64,11 @@
<artifactId>streampipes-connectors-kafka</artifactId>
<version>0.97.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.4</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-connectors-mqtt</artifactId>