This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch 4103-connect-remove-redundant-json-formats-and-migrate-logic-to-transformation-script in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 8eaabe8ead9d905b614d64d44ad123b4550ea580 Author: Philipp Zehnder <[email protected]> AuthorDate: Fri Jan 16 14:06:48 2026 +0100 fix(#4103): Remove geo json and array key parser --- .../connect/adapter/parser/JsonParsers.java | 28 +- .../adapter/parser/json/GeoJsonConstants.java | 33 --- .../connect/adapter/parser/json/GeoJsonParser.java | 299 --------------------- .../adapter/parser/json/JsonArrayKeyParser.java | 87 ------ 4 files changed, 2 insertions(+), 445 deletions(-) diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/JsonParsers.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/JsonParsers.java index 5c783fcb23..d7e55e56c9 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/JsonParsers.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/JsonParsers.java @@ -21,15 +21,12 @@ package org.apache.streampipes.extensions.management.connect.adapter.parser; import org.apache.streampipes.commons.exceptions.connect.ParseException; import org.apache.streampipes.extensions.api.connect.IParser; import org.apache.streampipes.extensions.api.connect.IParserEventHandler; -import org.apache.streampipes.extensions.management.connect.adapter.parser.json.GeoJsonParser; -import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonArrayKeyParser; import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonArrayParser; import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonObjectParser; import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonParser; import org.apache.streampipes.model.connect.grounding.ParserDescription; import org.apache.streampipes.model.connect.guess.SampleData; import org.apache.streampipes.model.staticproperty.StaticProperty; -import org.apache.streampipes.sdk.StaticProperties; import org.apache.streampipes.sdk.builder.adapter.ParserDescriptionBuilder; import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor; import org.apache.streampipes.sdk.helpers.Alternatives; @@ -53,7 +50,7 @@ public class JsonParsers implements IParser { public static final String KEY_JSON_OPTIONS = "json_options"; public static final String KEY_OBJECT = "object"; - public static final String LABEL_OBJECT = "Single Object"; + public static final String LABEL_OBJECT = "Object"; public static final String DESCRIPTION_OBJECT = "Each event is a single json object (e.g. {'value': 1})"; public static final String KEY_ARRAY = "array"; @@ -61,15 +58,6 @@ public class JsonParsers implements IParser { public static final String DESCRIPTION_ARRAY = "Each event consists of only one array of json objects, e.g. [{'value': 1}, {'value': 2}]"; - public static final String KEY_ARRAY_FIELD = "arrayField"; - public static final String LABEL_ARRAY_FIELD = "Array Field"; - public static final String DESCRIPTION_ARRAY_FIELD = - "Use one property of the json object that is an array, e.g. {'arrayKey': [{'value': 1}, {'value': 2}]}"; - - public static final String KEY_GEO_JSON = "geojson"; - public static final String LABEL_GEO_JSON = "GeoJSON"; - public static final String DESCRIPTION_GEO_JSON = "Reads GeoJson"; - private JsonParser selectedParser; @@ -92,13 +80,6 @@ public class JsonParsers implements IParser { case KEY_ARRAY -> { return new JsonParsers(new JsonArrayParser()); } - case KEY_ARRAY_FIELD -> { - var key = extractor.singleValueParameter("key", String.class); - return new JsonParsers(new JsonArrayKeyParser(key)); - } - case KEY_GEO_JSON -> { - return new JsonParsers(new GeoJsonParser()); - } } LOG.warn("No parser was found. Json object parser is used as a default"); @@ -111,12 +92,7 @@ public class JsonParsers implements IParser { .requiredAlternatives( Labels.from(KEY_JSON_OPTIONS, "", ""), Alternatives.from(Labels.from(KEY_OBJECT, LABEL_OBJECT, DESCRIPTION_OBJECT), true), - Alternatives.from(Labels.from(KEY_ARRAY, LABEL_ARRAY, DESCRIPTION_ARRAY)), - Alternatives.from(Labels.from(KEY_ARRAY_FIELD, LABEL_ARRAY_FIELD, DESCRIPTION_ARRAY_FIELD), - StaticProperties.group(Labels.from("arrayFieldConfig", "Delimiter", ""), - StaticProperties.stringFreeTextProperty( - Labels.from("key", "Key", "Key of the array within the Json object")))), - Alternatives.from(Labels.from(KEY_GEO_JSON, LABEL_GEO_JSON, DESCRIPTION_GEO_JSON))) + Alternatives.from(Labels.from(KEY_ARRAY, LABEL_ARRAY, DESCRIPTION_ARRAY))) .build(); } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonConstants.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonConstants.java deleted file mode 100644 index 40f130dec5..0000000000 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonConstants.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.management.connect.adapter.parser.json; - -public class GeoJsonConstants { - public static final String LATITUDE = "latitude"; - public static final String LONGITUDE = "longitude"; - public static final String ALTITUDE = "altitude"; - - public static final String COORDINATES = "coordinates"; - public static final String COORDINATES_LINE_STRING = "coordinatesLineString"; - public static final String COORDINATES_POLYGON = "coordinatesPolygon"; - public static final String COORDINATES_MULTI_POINT = "coordinatesMultiPoint"; - public static final String COORDINATES_MULTI_STRING = "coordinatesMultiString"; - public static final String COORDINATES_MULTI_POLYGON = "coordinatesMultiPolygon"; - -} diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParser.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParser.java deleted file mode 100644 index e6a5dbc844..0000000000 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParser.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.management.connect.adapter.parser.json; - -import org.apache.streampipes.commons.exceptions.connect.ParseException; -import org.apache.streampipes.extensions.api.connect.IParserEventHandler; -import org.apache.streampipes.extensions.management.connect.adapter.parser.util.JsonEventProperty; -import org.apache.streampipes.model.connect.guess.GuessSchema; -import org.apache.streampipes.model.connect.guess.SampleData; -import org.apache.streampipes.model.schema.EventProperty; -import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder; -import org.apache.streampipes.sdk.builder.adapter.SampleDataBuilder; -import org.apache.streampipes.serializers.json.JacksonSerializer; -import org.apache.streampipes.vocabulary.Geo; -import org.apache.streampipes.vocabulary.SO; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.geojson.Feature; -import org.geojson.LineString; -import org.geojson.MultiLineString; -import org.geojson.MultiPoint; -import org.geojson.MultiPolygon; -import org.geojson.Point; -import org.geojson.Polygon; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class GeoJsonParser extends JsonParser { - - private static final Logger LOG = LoggerFactory.getLogger(JsonArrayKeyParser.class); - - public GuessSchema getGuessSchema(InputStream inputStream) { - Feature geoFeature = null; - try { - geoFeature = JacksonSerializer.getObjectMapper(Map.of( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true - )).readValue(inputStream, Feature.class); - - } catch (IOException e) { - throw new ParseException("Could not parse geo json into a feature type", e); - } - - List<EventProperty> eventProperties = new LinkedList<>(); - var sampleValues = new HashMap<String, Object>(); - - if (geoFeature.getGeometry() instanceof Point) { - Point point = (Point) geoFeature.getGeometry(); - eventProperties.add( - getEventPropertyGeoJson( - GeoJsonConstants.LONGITUDE, - point.getCoordinates() - .getLongitude(), - Geo.LNG - )); - eventProperties.add( - getEventPropertyGeoJson( - GeoJsonConstants.LATITUDE, - point.getCoordinates() - .getLatitude(), - Geo.LAT - )); - - sampleValues.put( - GeoJsonConstants.LONGITUDE, - point.getCoordinates() - .getLongitude() - ); - sampleValues.put( - GeoJsonConstants.LATITUDE, - point.getCoordinates() - .getLatitude() - ); - if (point.getCoordinates() - .hasAltitude()) { - eventProperties.add( - getEventPropertyGeoJson(GeoJsonConstants.ALTITUDE, - point.getCoordinates() - .getAltitude(), - SO.ALTITUDE - )); - point.getCoordinates() - .getAltitude(); - } - - } else if (geoFeature.getGeometry() instanceof LineString) { - LineString lineString = (LineString) geoFeature.getGeometry(); - eventProperties.add( - JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_LINE_STRING, lineString.getCoordinates())); - sampleValues.put( - GeoJsonConstants.COORDINATES_LINE_STRING, - lineString.getCoordinates() - ); - } else if (geoFeature.getGeometry() instanceof Polygon) { - Polygon polygon = (Polygon) geoFeature.getGeometry(); - eventProperties.add( - JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_POLYGON, polygon.getCoordinates())); - sampleValues.put( - GeoJsonConstants.COORDINATES_POLYGON, - polygon.getCoordinates() - ); - } else if (geoFeature.getGeometry() instanceof MultiPoint) { - MultiPoint multiPoint = (MultiPoint) geoFeature.getGeometry(); - eventProperties.add( - JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_MULTI_POINT, multiPoint.getCoordinates())); - sampleValues.put( - GeoJsonConstants.COORDINATES_MULTI_POINT, - multiPoint.getCoordinates() - ); - } else if (geoFeature.getGeometry() instanceof MultiLineString) { - MultiLineString multiLineString = (MultiLineString) geoFeature.getGeometry(); - eventProperties.add( - JsonEventProperty.getEventProperty( - GeoJsonConstants.COORDINATES_LINE_STRING, - multiLineString.getCoordinates() - )); - sampleValues.put( - GeoJsonConstants.COORDINATES_LINE_STRING, - multiLineString.getCoordinates() - ); - } else if (geoFeature.getGeometry() instanceof MultiPolygon) { - MultiPolygon multiPolygon = (MultiPolygon) geoFeature.getGeometry(); - eventProperties.add(JsonEventProperty.getEventProperty( - GeoJsonConstants.COORDINATES_MULTI_POLYGON, - multiPolygon.getCoordinates() - )); - sampleValues.put( - GeoJsonConstants.COORDINATES_MULTI_POLYGON, - multiPolygon.getCoordinates() - ); - } else { - LOG.error("No geometry field found in geofeature: " + geoFeature.toString()); - } - - - for (Map.Entry<String, Object> entry : geoFeature.getProperties() - .entrySet()) { - EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue()); - eventProperties.add(p); - sampleValues.put( - p.getRuntimeName(), - entry.getValue() - ); - } - - var schemaBuilder = GuessSchemaBuilder.create(); - eventProperties.forEach(schemaBuilder::property); - sampleValues.forEach(schemaBuilder::sample); - - return schemaBuilder.build(); - } - - @Override - public SampleData getSampleData(InputStream inputStream) { - var guessSchema = getGuessSchema(inputStream); - var sampleString = guessSchema.getEventPreview() - .get(0); - ObjectMapper mapper = new ObjectMapper(); - try { - Map<String, Object> event = mapper.readValue(sampleString, Map.class); - return SampleDataBuilder.create() - .sample(event) - .build(); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - } - - @Override - public void parse(InputStream inputStream, IParserEventHandler handler) throws ParseException { - Map<String, Object> event = toMap(inputStream, Map.class); - handler.handle(geoJsonFormatter(event)); - } - - private EventProperty getEventPropertyGeoJson(String name, Object value, String domain) { - EventProperty eventProperty = JsonEventProperty.getEventProperty(name, value); - eventProperty.setSemanticType(domain); - - return eventProperty; - } - - private Map<String, Object> geoJsonFormatter(Map<String, Object> map) { - Map<String, Object> geoJson = new HashMap<String, Object>(); - Boolean foundGeometry = false; - Boolean foundProperties = false; - - for (Map.Entry<String, Object> entry : map.entrySet()) { - if (entry.getKey() - .equalsIgnoreCase("GEOMETRY")) { - foundGeometry = true; - geoJson.putAll(formatGeometryField((Map<String, Object>) entry.getValue())); - } - if (entry.getKey() - .equalsIgnoreCase("PROPERTIES")) { - foundProperties = true; - for (Map.Entry<String, Object> innerEntry : ((Map<String, Object>) entry.getValue()).entrySet()) { - geoJson.put(innerEntry.getKey(), innerEntry.getValue()); - } - } - } - - if (!foundGeometry) { - LOG.warn("Geometry field not found"); - } - if (!foundProperties) { - LOG.warn("Property field not found"); - } - - return geoJson; - } - - - private Map<String, Object> formatGeometryField(Map<String, Object> map) { - Map<String, Object> geometryFields = new HashMap<String, Object>(); - - String type = (String) map.get("type"); - - if (type.equalsIgnoreCase("POINT")) { - List<Double> coordinates = (List<Double>) map.get(GeoJsonConstants.COORDINATES); - - try { - geometryFields.put(GeoJsonConstants.LONGITUDE, coordinates.get(0)); - geometryFields.put(GeoJsonConstants.LATITUDE, coordinates.get(1)); - if (coordinates.size() == 3) { - geometryFields.put(GeoJsonConstants.ALTITUDE, coordinates.get(2)); - } - } catch (IndexOutOfBoundsException e) { - LOG.error(e.getMessage()); - } - - } else if (type.equalsIgnoreCase("LINESTRING")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_LINE_STRING, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else if (type.equalsIgnoreCase("POLYGON")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_POLYGON, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else if (type.equalsIgnoreCase("MULTIPOINT")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_MULTI_POINT, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else if (type.equalsIgnoreCase("MULTILINESTRING")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_MULTI_STRING, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else if (type.equalsIgnoreCase("MULTIPOLYGON")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_MULTI_POLYGON, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else { - LOG.error(type + "is not a suppported field type"); - } - - return geometryFields; - } - -} diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/JsonArrayKeyParser.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/JsonArrayKeyParser.java deleted file mode 100644 index 51eab826bb..0000000000 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/JsonArrayKeyParser.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.management.connect.adapter.parser.json; - -import org.apache.streampipes.commons.exceptions.connect.ParseException; -import org.apache.streampipes.extensions.api.connect.IParserEventHandler; -import org.apache.streampipes.model.connect.guess.SampleData; -import org.apache.streampipes.sdk.builder.adapter.SampleDataBuilder; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.InputStream; -import java.util.List; -import java.util.Map; - -public class JsonArrayKeyParser extends JsonParser { - private String key; - - public JsonArrayKeyParser(String key) { - this.key = key; - } - - private static final Logger LOG = LoggerFactory.getLogger(JsonArrayKeyParser.class); - - @Override - public SampleData getSampleData(InputStream inputStream) { - var event = getEvents(inputStream).get(0); - return SampleDataBuilder.create() - .sample(event) - .build(); - } - - @Override - public void parse(InputStream inputStream, IParserEventHandler handler) throws ParseException { - try { - var events = getEvents(inputStream); - events.forEach(event -> { - handler.handle(event); - }); - } catch (ParseException e) { - LOG.error("Could not parse json event", e); - } - - } - - private List<Map<String, Object>> getEvents(InputStream inputStream) { - var event = toMap(inputStream, Map.class); - - if (event.containsKey(key)) { - var list = event.get(key); - if (list instanceof List) { - if (((List<?>) list).size() > 0) { - if (((List<?>) list).get(0) instanceof Map<?, ?>) { - return (List<Map<String, Object>>) list; - } else { - throw new ParseException("The content of the array must be a json object. It was: %s".formatted(list)); - } - } else { - throw new ParseException("The selected array is empty." - .formatted(list)); - } - } else { - throw new ParseException("Please select the key of an array field. The object: %s was selected instead" - .formatted(list)); - } - } else { - throw new ParseException("The selected key '%s' could not be found in the json object".formatted(key)); - } - } -}
