This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch 4103-connect-remove-redundant-json-formats-and-migrate-logic-to-transformation-script-1 in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit a14fc0e450488184a98baacc54495ec25f967162 Author: Philipp Zehnder <[email protected]> AuthorDate: Mon Jan 19 15:53:46 2026 +0100 fix(#4103): Add migration to remove geo json format --- .../connect/adapter/parser/JsonParsers.java | 14 +- .../adapter/parser/json/GeoJsonConstants.java | 33 --- .../connect/adapter/parser/json/GeoJsonParser.java | 299 -------------------- .../adapter/parser/json/GeoJsonParserTest.java | 113 -------- .../adapter/migration/GenericAdapterConverter.java | 150 ---------- .../adapter/migration/IAdapterConverter.java | 26 -- .../adapter/migration/MigrationHelpers.java | 59 ---- .../migration/SpecificAdapterConverter.java | 40 --- .../migration/format/CsvFormatMigrator.java | 70 ----- .../migration/format/EmptyFormatMigrator.java | 29 -- .../adapter/migration/format/FormatMigrator.java | 26 -- .../migration/format/JsonFormatMigrator.java | 71 ----- .../migration/format/XmlFormatMigrator.java | 49 ---- .../adapter/migration/utils/AdapterModels.java | 61 ---- .../adapter/migration/utils/DocumentKeys.java | 25 -- .../connect/adapter/migration/utils/FormatIds.java | 38 --- .../migration/utils/GenericAdapterUtils.java | 313 --------------------- .../core/migrations/AvailableMigrations.java | 2 - .../core/migrations/v093/AdapterBackupWriter.java | 63 ----- .../core/migrations/v093/AdapterMigration.java | 149 ---------- .../migrations/v093/migrator/AdapterMigrator.java | 27 -- .../v093/migrator/GenericAdapterMigrator.java | 51 ---- .../v093/migrator/SpecificAdapterMigrator.java | 53 ---- .../v099/connect/MigrateAdaptersToUseScript.java | 104 +++++-- ui/cypress/fixtures/connect/format/geoJson.json | 11 - ui/cypress/support/builder/AdapterBuilder.ts | 5 + ui/cypress/support/model/AdapterInput.ts | 2 + ui/cypress/support/utils/connect/ConnectBtns.ts | 6 + ui/cypress/tests/connect/formats/format.spec.ts | 23 +- ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts | 2 +- .../adapter-configuration-state.service.ts | 2 +- 31 files changed, 98 insertions(+), 1818 deletions(-) diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/JsonParsers.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/JsonParsers.java index 5c783fcb23..66891b674a 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/JsonParsers.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/JsonParsers.java @@ -21,7 +21,6 @@ package org.apache.streampipes.extensions.management.connect.adapter.parser; import org.apache.streampipes.commons.exceptions.connect.ParseException; import org.apache.streampipes.extensions.api.connect.IParser; import org.apache.streampipes.extensions.api.connect.IParserEventHandler; -import org.apache.streampipes.extensions.management.connect.adapter.parser.json.GeoJsonParser; import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonArrayKeyParser; import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonArrayParser; import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonObjectParser; @@ -53,7 +52,7 @@ public class JsonParsers implements IParser { public static final String KEY_JSON_OPTIONS = "json_options"; public static final String KEY_OBJECT = "object"; - public static final String LABEL_OBJECT = "Single Object"; + public static final String LABEL_OBJECT = "Object"; public static final String DESCRIPTION_OBJECT = "Each event is a single json object (e.g. {'value': 1})"; public static final String KEY_ARRAY = "array"; @@ -66,10 +65,6 @@ public class JsonParsers implements IParser { public static final String DESCRIPTION_ARRAY_FIELD = "Use one property of the json object that is an array, e.g. {'arrayKey': [{'value': 1}, {'value': 2}]}"; - public static final String KEY_GEO_JSON = "geojson"; - public static final String LABEL_GEO_JSON = "GeoJSON"; - public static final String DESCRIPTION_GEO_JSON = "Reads GeoJson"; - private JsonParser selectedParser; @@ -96,9 +91,6 @@ public class JsonParsers implements IParser { var key = extractor.singleValueParameter("key", String.class); return new JsonParsers(new JsonArrayKeyParser(key)); } - case KEY_GEO_JSON -> { - return new JsonParsers(new GeoJsonParser()); - } } LOG.warn("No parser was found. Json object parser is used as a default"); @@ -115,8 +107,8 @@ public class JsonParsers implements IParser { Alternatives.from(Labels.from(KEY_ARRAY_FIELD, LABEL_ARRAY_FIELD, DESCRIPTION_ARRAY_FIELD), StaticProperties.group(Labels.from("arrayFieldConfig", "Delimiter", ""), StaticProperties.stringFreeTextProperty( - Labels.from("key", "Key", "Key of the array within the Json object")))), - Alternatives.from(Labels.from(KEY_GEO_JSON, LABEL_GEO_JSON, DESCRIPTION_GEO_JSON))) + Labels.from("key", "Key", "Key of the array within the Json object")))) + ) .build(); } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonConstants.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonConstants.java deleted file mode 100644 index 40f130dec5..0000000000 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonConstants.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.management.connect.adapter.parser.json; - -public class GeoJsonConstants { - public static final String LATITUDE = "latitude"; - public static final String LONGITUDE = "longitude"; - public static final String ALTITUDE = "altitude"; - - public static final String COORDINATES = "coordinates"; - public static final String COORDINATES_LINE_STRING = "coordinatesLineString"; - public static final String COORDINATES_POLYGON = "coordinatesPolygon"; - public static final String COORDINATES_MULTI_POINT = "coordinatesMultiPoint"; - public static final String COORDINATES_MULTI_STRING = "coordinatesMultiString"; - public static final String COORDINATES_MULTI_POLYGON = "coordinatesMultiPolygon"; - -} diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParser.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParser.java deleted file mode 100644 index e6a5dbc844..0000000000 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParser.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.management.connect.adapter.parser.json; - -import org.apache.streampipes.commons.exceptions.connect.ParseException; -import org.apache.streampipes.extensions.api.connect.IParserEventHandler; -import org.apache.streampipes.extensions.management.connect.adapter.parser.util.JsonEventProperty; -import org.apache.streampipes.model.connect.guess.GuessSchema; -import org.apache.streampipes.model.connect.guess.SampleData; -import org.apache.streampipes.model.schema.EventProperty; -import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder; -import org.apache.streampipes.sdk.builder.adapter.SampleDataBuilder; -import org.apache.streampipes.serializers.json.JacksonSerializer; -import org.apache.streampipes.vocabulary.Geo; -import org.apache.streampipes.vocabulary.SO; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.geojson.Feature; -import org.geojson.LineString; -import org.geojson.MultiLineString; -import org.geojson.MultiPoint; -import org.geojson.MultiPolygon; -import org.geojson.Point; -import org.geojson.Polygon; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class GeoJsonParser extends JsonParser { - - private static final Logger LOG = LoggerFactory.getLogger(JsonArrayKeyParser.class); - - public GuessSchema getGuessSchema(InputStream inputStream) { - Feature geoFeature = null; - try { - geoFeature = JacksonSerializer.getObjectMapper(Map.of( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true - )).readValue(inputStream, Feature.class); - - } catch (IOException e) { - throw new ParseException("Could not parse geo json into a feature type", e); - } - - List<EventProperty> eventProperties = new LinkedList<>(); - var sampleValues = new HashMap<String, Object>(); - - if (geoFeature.getGeometry() instanceof Point) { - Point point = (Point) geoFeature.getGeometry(); - eventProperties.add( - getEventPropertyGeoJson( - GeoJsonConstants.LONGITUDE, - point.getCoordinates() - .getLongitude(), - Geo.LNG - )); - eventProperties.add( - getEventPropertyGeoJson( - GeoJsonConstants.LATITUDE, - point.getCoordinates() - .getLatitude(), - Geo.LAT - )); - - sampleValues.put( - GeoJsonConstants.LONGITUDE, - point.getCoordinates() - .getLongitude() - ); - sampleValues.put( - GeoJsonConstants.LATITUDE, - point.getCoordinates() - .getLatitude() - ); - if (point.getCoordinates() - .hasAltitude()) { - eventProperties.add( - getEventPropertyGeoJson(GeoJsonConstants.ALTITUDE, - point.getCoordinates() - .getAltitude(), - SO.ALTITUDE - )); - point.getCoordinates() - .getAltitude(); - } - - } else if (geoFeature.getGeometry() instanceof LineString) { - LineString lineString = (LineString) geoFeature.getGeometry(); - eventProperties.add( - JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_LINE_STRING, lineString.getCoordinates())); - sampleValues.put( - GeoJsonConstants.COORDINATES_LINE_STRING, - lineString.getCoordinates() - ); - } else if (geoFeature.getGeometry() instanceof Polygon) { - Polygon polygon = (Polygon) geoFeature.getGeometry(); - eventProperties.add( - JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_POLYGON, polygon.getCoordinates())); - sampleValues.put( - GeoJsonConstants.COORDINATES_POLYGON, - polygon.getCoordinates() - ); - } else if (geoFeature.getGeometry() instanceof MultiPoint) { - MultiPoint multiPoint = (MultiPoint) geoFeature.getGeometry(); - eventProperties.add( - JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_MULTI_POINT, multiPoint.getCoordinates())); - sampleValues.put( - GeoJsonConstants.COORDINATES_MULTI_POINT, - multiPoint.getCoordinates() - ); - } else if (geoFeature.getGeometry() instanceof MultiLineString) { - MultiLineString multiLineString = (MultiLineString) geoFeature.getGeometry(); - eventProperties.add( - JsonEventProperty.getEventProperty( - GeoJsonConstants.COORDINATES_LINE_STRING, - multiLineString.getCoordinates() - )); - sampleValues.put( - GeoJsonConstants.COORDINATES_LINE_STRING, - multiLineString.getCoordinates() - ); - } else if (geoFeature.getGeometry() instanceof MultiPolygon) { - MultiPolygon multiPolygon = (MultiPolygon) geoFeature.getGeometry(); - eventProperties.add(JsonEventProperty.getEventProperty( - GeoJsonConstants.COORDINATES_MULTI_POLYGON, - multiPolygon.getCoordinates() - )); - sampleValues.put( - GeoJsonConstants.COORDINATES_MULTI_POLYGON, - multiPolygon.getCoordinates() - ); - } else { - LOG.error("No geometry field found in geofeature: " + geoFeature.toString()); - } - - - for (Map.Entry<String, Object> entry : geoFeature.getProperties() - .entrySet()) { - EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue()); - eventProperties.add(p); - sampleValues.put( - p.getRuntimeName(), - entry.getValue() - ); - } - - var schemaBuilder = GuessSchemaBuilder.create(); - eventProperties.forEach(schemaBuilder::property); - sampleValues.forEach(schemaBuilder::sample); - - return schemaBuilder.build(); - } - - @Override - public SampleData getSampleData(InputStream inputStream) { - var guessSchema = getGuessSchema(inputStream); - var sampleString = guessSchema.getEventPreview() - .get(0); - ObjectMapper mapper = new ObjectMapper(); - try { - Map<String, Object> event = mapper.readValue(sampleString, Map.class); - return SampleDataBuilder.create() - .sample(event) - .build(); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - } - - @Override - public void parse(InputStream inputStream, IParserEventHandler handler) throws ParseException { - Map<String, Object> event = toMap(inputStream, Map.class); - handler.handle(geoJsonFormatter(event)); - } - - private EventProperty getEventPropertyGeoJson(String name, Object value, String domain) { - EventProperty eventProperty = JsonEventProperty.getEventProperty(name, value); - eventProperty.setSemanticType(domain); - - return eventProperty; - } - - private Map<String, Object> geoJsonFormatter(Map<String, Object> map) { - Map<String, Object> geoJson = new HashMap<String, Object>(); - Boolean foundGeometry = false; - Boolean foundProperties = false; - - for (Map.Entry<String, Object> entry : map.entrySet()) { - if (entry.getKey() - .equalsIgnoreCase("GEOMETRY")) { - foundGeometry = true; - geoJson.putAll(formatGeometryField((Map<String, Object>) entry.getValue())); - } - if (entry.getKey() - .equalsIgnoreCase("PROPERTIES")) { - foundProperties = true; - for (Map.Entry<String, Object> innerEntry : ((Map<String, Object>) entry.getValue()).entrySet()) { - geoJson.put(innerEntry.getKey(), innerEntry.getValue()); - } - } - } - - if (!foundGeometry) { - LOG.warn("Geometry field not found"); - } - if (!foundProperties) { - LOG.warn("Property field not found"); - } - - return geoJson; - } - - - private Map<String, Object> formatGeometryField(Map<String, Object> map) { - Map<String, Object> geometryFields = new HashMap<String, Object>(); - - String type = (String) map.get("type"); - - if (type.equalsIgnoreCase("POINT")) { - List<Double> coordinates = (List<Double>) map.get(GeoJsonConstants.COORDINATES); - - try { - geometryFields.put(GeoJsonConstants.LONGITUDE, coordinates.get(0)); - geometryFields.put(GeoJsonConstants.LATITUDE, coordinates.get(1)); - if (coordinates.size() == 3) { - geometryFields.put(GeoJsonConstants.ALTITUDE, coordinates.get(2)); - } - } catch (IndexOutOfBoundsException e) { - LOG.error(e.getMessage()); - } - - } else if (type.equalsIgnoreCase("LINESTRING")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_LINE_STRING, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else if (type.equalsIgnoreCase("POLYGON")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_POLYGON, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else if (type.equalsIgnoreCase("MULTIPOINT")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_MULTI_POINT, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else if (type.equalsIgnoreCase("MULTILINESTRING")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_MULTI_STRING, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else if (type.equalsIgnoreCase("MULTIPOLYGON")) { - geometryFields.put( - GeoJsonConstants.COORDINATES_MULTI_POLYGON, - map.get(GeoJsonConstants.COORDINATES) - .toString() - ); - - } else { - LOG.error(type + "is not a suppported field type"); - } - - return geometryFields; - } - -} diff --git a/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParserTest.java b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParserTest.java deleted file mode 100644 index 2169ed67bb..0000000000 --- a/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/connect/adapter/parser/json/GeoJsonParserTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.management.connect.adapter.parser.json; - -import org.apache.streampipes.extensions.api.connect.IParserEventHandler; -import org.apache.streampipes.model.schema.PropertyScope; -import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder; -import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder; -import org.apache.streampipes.sdk.utils.Datatypes; -import org.apache.streampipes.serializers.json.JacksonSerializer; -import org.apache.streampipes.vocabulary.Geo; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import org.apache.commons.io.IOUtils; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -public class GeoJsonParserTest { - - GeoJsonParser parser = new GeoJsonParser(); - - Map<String, Object> event = Map.of( - "type", "Feature", - "geometry", Map.of( - "type", "Point", - "coordinates", new Double[]{6.946535, 51.437344} - ), - "properties", Map.of( - "temperature", 5.0 - ) - ); - - @Test - public void getGuessSchema() { - var expected = GuessSchemaBuilder.create() - .property(PrimitivePropertyBuilder - .create(Datatypes.Float, "longitude") - .semanticType(Geo.LNG) - .description("") - .scope(PropertyScope.MEASUREMENT_PROPERTY) - .build()) - .property(PrimitivePropertyBuilder - .create(Datatypes.Float, "latitude") - .semanticType(Geo.LAT) - .scope(PropertyScope.MEASUREMENT_PROPERTY) - .description("") - .build()) - .property(PrimitivePropertyBuilder - .create(Datatypes.Float, "temperature") - .scope(PropertyScope.MEASUREMENT_PROPERTY) - .description("") - .build()) - .sample("longitude", 6.946535) - .sample("latitude", 51.437344) - .sample("temperature", 5.0) - .build(); - - var result = parser.getGuessSchema(toEvent(event)); - - Assertions.assertEquals(expected, result); - } - - @Test - public void parse() { - var mockEventHandler = mock(IParserEventHandler.class); - parser.parse(toEvent(event), mockEventHandler); - - Map<String, Object> expectedEvent = new HashMap<>(); - expectedEvent.put("latitude", 51.437344); - expectedEvent.put("temperature", 5.0); - expectedEvent.put("longitude", 6.946535); - verify(mockEventHandler, times(1)).handle(expectedEvent); - } - - - private InputStream toEvent(Map<String, Object> event) { - try { - var s = JacksonSerializer.getObjectMapper(Map.of( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true - )).writeValueAsString(event); - return IOUtils.toInputStream(s, StandardCharsets.UTF_8); - } catch (JsonProcessingException e) { - Assertions.fail("Could not convert event to string: " + event); - return null; - } - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java deleted file mode 100644 index abd51c8187..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration; - -import org.apache.streampipes.model.connect.adapter.migration.format.CsvFormatMigrator; -import org.apache.streampipes.model.connect.adapter.migration.format.EmptyFormatMigrator; -import org.apache.streampipes.model.connect.adapter.migration.format.JsonFormatMigrator; -import org.apache.streampipes.model.connect.adapter.migration.format.XmlFormatMigrator; - -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers.PROPERTIES; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.CSV; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.CSV_FORMAT_ID; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.GEOJSON_FORMAT_ID; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.GEOJSON_NEW_KEY; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.IMAGE; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.IMAGE_FORMAT_ID; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_KEY_FORMAT_ID; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_KEY_NEW_KEY; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_NO_KEY_FORMAT_ID; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_NO_KEY_NEW_KEY; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_OBJECT_FORMAT_ID; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_OBJECT_NEW_KEY; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.XML; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.XML_FORMAT_ID; -import static org.apache.streampipes.model.connect.adapter.migration.utils.GenericAdapterUtils.applyFormat; -import static org.apache.streampipes.model.connect.adapter.migration.utils.GenericAdapterUtils.getFormatTemplate; - -public class GenericAdapterConverter implements IAdapterConverter { - - private static final Logger LOG = LoggerFactory.getLogger(GenericAdapterConverter.class); - private static final String PROTOCOL_DESC_KEY = "protocolDescription"; - private static final String FORMAT_DESC_KEY = "formatDescription"; - private static final String CONFIG_KEY = "config"; - - private final MigrationHelpers helpers; - - private final String typeFieldName; - private final boolean importMode; - - public GenericAdapterConverter(boolean importMode) { - this.helpers = new MigrationHelpers(); - this.importMode = importMode; - this.typeFieldName = importMode ? "@class" : "type"; - } - - @Override - public JsonObject convert(JsonObject adapter) { - helpers.updateType(adapter, typeFieldName); - if (!importMode) { - helpers.updateFieldType(adapter); - } - - var properties = getProperties(adapter); - - if (!properties.has(CONFIG_KEY)) { - properties.add(CONFIG_KEY, new JsonArray()); - } - - JsonObject protocolDescription = properties.get(PROTOCOL_DESC_KEY).getAsJsonObject(); - migrateProtocolDescription(adapter, protocolDescription); - properties.remove(PROTOCOL_DESC_KEY); - - if (properties.has(FORMAT_DESC_KEY)) { - JsonObject formatDescription = properties.get(FORMAT_DESC_KEY).getAsJsonObject(); - migrateFormatDescription(adapter, formatDescription); - properties.remove(FORMAT_DESC_KEY); - } - - return adapter; - } - - private JsonObject getProperties(JsonObject object) { - return importMode ? object : object.get(PROPERTIES).getAsJsonObject(); - } - - private void migrateProtocolDescription(JsonObject adapter, - JsonObject protocolDescription) { - JsonArray config = getProperties(adapter).get(CONFIG_KEY).getAsJsonArray(); - JsonArray protocolDescriptionConfig = protocolDescription.get(CONFIG_KEY).getAsJsonArray(); - protocolDescriptionConfig.forEach(config::add); - } - - private void migrateFormatDescription(JsonObject adapter, - JsonObject formatDescription) { - var adapterConfig = getProperties(adapter) - .get(CONFIG_KEY) - .getAsJsonArray(); - - var formatTemplate = getFormatTemplate(); - - if (isFormat(formatDescription, JSON_OBJECT_FORMAT_ID)) { - var migrator = new JsonFormatMigrator(JSON_OBJECT_NEW_KEY, formatDescription); - applyFormat(JSON, formatTemplate, migrator); - } else if (isFormat(formatDescription, JSON_ARRAY_KEY_FORMAT_ID)) { - var migrator = new JsonFormatMigrator(JSON_ARRAY_KEY_NEW_KEY, formatDescription); - applyFormat(JSON, formatTemplate, migrator); - } else if (isFormat(formatDescription, JSON_ARRAY_NO_KEY_FORMAT_ID)) { - var migrator = new JsonFormatMigrator(JSON_ARRAY_NO_KEY_NEW_KEY, formatDescription); - applyFormat(JSON, formatTemplate, migrator); - } else if (isFormat(formatDescription, CSV_FORMAT_ID)) { - var migrator = new CsvFormatMigrator(formatDescription); - applyFormat(CSV, formatTemplate, migrator); - } else if (isFormat(formatDescription, GEOJSON_FORMAT_ID)) { - var migrator = new JsonFormatMigrator(GEOJSON_NEW_KEY, formatDescription); - applyFormat(JSON, formatTemplate, migrator); - } else if (isFormat(formatDescription, XML_FORMAT_ID)) { - var migrator = new XmlFormatMigrator(formatDescription); - applyFormat(XML, formatTemplate, migrator); - } else if (isFormat(formatDescription, IMAGE_FORMAT_ID)) { - applyFormat(IMAGE, formatTemplate, new EmptyFormatMigrator()); - } else { - LOG.warn("Found unknown format {}", getAppId(formatDescription)); - } - - adapterConfig.add(formatTemplate); - } - - private boolean isFormat(JsonObject formatDescription, - String format) { - return getAppId(formatDescription).equals(format); - } - - private String getAppId(JsonObject formatDescription) { - return formatDescription - .getAsJsonObject() - .get(MigrationHelpers.APP_ID).getAsString(); - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/IAdapterConverter.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/IAdapterConverter.java deleted file mode 100644 index db799c2f13..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/IAdapterConverter.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration; - -import com.google.gson.JsonObject; - -public interface IAdapterConverter { - - JsonObject convert(JsonObject adapterDescription); -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java deleted file mode 100644 index 2343d41bb0..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration; - -import org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels; - -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; - -public class MigrationHelpers { - - public static final String ID = "_id"; - public static final String REV = "_rev"; - - public static final String APP_ID = "appId"; - - public static final String PROPERTIES = "properties"; - - public String getDocId(JsonObject adapter) { - return adapter.get(ID).getAsString(); - } - - public String getRev(JsonObject adapter) { - return adapter.get(REV).getAsString(); - } - - public String getAppId(JsonObject adapter) { - return adapter.get("properties").getAsJsonObject().get(APP_ID).getAsString(); - } - - public void updateType(JsonObject adapter, - String typeFieldName) { - adapter.add(typeFieldName, new JsonPrimitive(AdapterModels.NEW_MODEL)); - } - - public void updateFieldType(JsonObject adapter) { - adapter.add("field_type", new JsonPrimitive(AdapterModels.NEW_MODEL)); - } - - public String getAdapterName(JsonObject adapter) { - return adapter.get("properties").getAsJsonObject().get("name").getAsString(); - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/SpecificAdapterConverter.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/SpecificAdapterConverter.java deleted file mode 100644 index b584081a5a..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/SpecificAdapterConverter.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration; - -import com.google.gson.JsonObject; - -public class SpecificAdapterConverter implements IAdapterConverter { - - private final MigrationHelpers helpers; - private final String typeFieldName; - - public SpecificAdapterConverter(boolean importMode) { - this.helpers = new MigrationHelpers(); - this.typeFieldName = importMode ? "@class" : "type"; - } - - @Override - public JsonObject convert(JsonObject adapter) { - helpers.updateType(adapter, typeFieldName); - helpers.updateFieldType(adapter); - - return adapter; - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/CsvFormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/CsvFormatMigrator.java deleted file mode 100644 index e7365d98ef..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/CsvFormatMigrator.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.format; - -import com.google.gson.JsonObject; - -public class CsvFormatMigrator implements FormatMigrator { - - private final JsonObject formatDescription; - - public CsvFormatMigrator(JsonObject formatDescription) { - this.formatDescription = formatDescription; - } - - @Override - public void migrate(JsonObject newFormatProperties) { - - // read value for delimter & header information - var delimiter = this.formatDescription.getAsJsonObject() - .get("config").getAsJsonArray() - .get(0).getAsJsonObject() - .get("properties").getAsJsonObject() - .get("value").getAsString(); - var selectedHeader = this.formatDescription.getAsJsonObject() - .get("config").getAsJsonArray() - .get(1).getAsJsonObject() - .getAsJsonObject("properties") - .getAsJsonArray("options") - .get(0) - .getAsJsonObject() - .get("selected") - .getAsBoolean(); - - // write values - newFormatProperties - .getAsJsonObject("properties") - .getAsJsonArray("staticProperties") - .get(0) - .getAsJsonObject() - .get("properties") - .getAsJsonObject() - .addProperty("value", delimiter); - - newFormatProperties - .getAsJsonObject("properties") - .getAsJsonArray("staticProperties") - .get(1).getAsJsonObject() - .getAsJsonObject("properties") - .getAsJsonArray("options") - .get(0) - .getAsJsonObject() - .addProperty("selected", selectedHeader); - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/EmptyFormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/EmptyFormatMigrator.java deleted file mode 100644 index 477e07312b..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/EmptyFormatMigrator.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.format; - -import com.google.gson.JsonObject; - -public class EmptyFormatMigrator implements FormatMigrator { - - @Override - public void migrate(JsonObject newFormatProperties) { - - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/FormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/FormatMigrator.java deleted file mode 100644 index e6bf46cdc3..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/FormatMigrator.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.format; - -import com.google.gson.JsonObject; - -public interface FormatMigrator { - - void migrate(JsonObject newFormatProperties); -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/JsonFormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/JsonFormatMigrator.java deleted file mode 100644 index aefa2f471a..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/JsonFormatMigrator.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.format; - -import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; - -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; - -import static org.apache.streampipes.model.connect.adapter.migration.utils.DocumentKeys.ALTERNATIVES; -import static org.apache.streampipes.model.connect.adapter.migration.utils.DocumentKeys.INTERNAL_NAME; -import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_KEY_NEW_KEY; - -public class JsonFormatMigrator implements FormatMigrator { - - private final String jsonFormatId; - - private final JsonObject existingFormat; - - public JsonFormatMigrator(String jsonFormatId, - JsonObject existingFormat) { - this.jsonFormatId = jsonFormatId; - this.existingFormat = existingFormat; - } - - @Override - public void migrate(JsonObject newFormatProperties) { - newFormatProperties.get(MigrationHelpers.PROPERTIES).getAsJsonObject() - .get("staticProperties").getAsJsonArray() - .get(0).getAsJsonObject() - .get(MigrationHelpers.PROPERTIES).getAsJsonObject() - .get(ALTERNATIVES).getAsJsonArray() - .forEach(al -> { - if (al.getAsJsonObject().get(INTERNAL_NAME).getAsString().equals(jsonFormatId)) { - al.getAsJsonObject().add("selected", new JsonPrimitive(true)); - - // If the type is JSON_ARRAY set the value for the key in the configuration - if (this.jsonFormatId.equals(JSON_ARRAY_KEY_NEW_KEY)) { - var keyValue = this.existingFormat.getAsJsonObject() - .get("config").getAsJsonArray() - .get(0).getAsJsonObject() - .get("properties").getAsJsonObject() - .get("value").getAsString(); - al.getAsJsonObject() - .get("staticProperty").getAsJsonObject() - .get("properties").getAsJsonObject() - .get("staticProperties").getAsJsonArray() - .get(0).getAsJsonObject() - .get("properties").getAsJsonObject() - .addProperty("value", keyValue); - } - } - }); - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/XmlFormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/XmlFormatMigrator.java deleted file mode 100644 index c6a44a8d80..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/XmlFormatMigrator.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.format; - -import com.google.gson.JsonObject; - -public class XmlFormatMigrator implements FormatMigrator { - private final JsonObject formatDescription; - - public XmlFormatMigrator(JsonObject formatDescription) { - this.formatDescription = formatDescription; - } - - @Override - public void migrate(JsonObject newFormatProperties) { - var tagValue = this.formatDescription.getAsJsonObject() - .get("config").getAsJsonArray() - .get(0).getAsJsonObject() - .get("properties").getAsJsonObject() - .get("value").getAsString(); - newFormatProperties - .getAsJsonObject("properties") - .get("staticProperties") - .getAsJsonArray() - .get(0) - .getAsJsonObject() - .get("properties") - .getAsJsonObject() - .addProperty("value", tagValue); - - - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/AdapterModels.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/AdapterModels.java deleted file mode 100644 index 07833dc0bd..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/AdapterModels.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.utils; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -public class AdapterModels { - - public static final String GENERIC_SET = - "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription"; - public static final String SPECIFIC_SET = - "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription"; - public static final String GENERIC_STREAM = - "org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription"; - public static final String SPECIFIC_STREAM = - "org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription"; - - public static final String NEW_MODEL = - "org.apache.streampipes.model.connect.adapter.AdapterDescription"; - public static List<String> deprecatedAdapterSetClasses = List.of( - GENERIC_SET, - SPECIFIC_SET - ); - - public static List<String> deprecatedAdapterStreamClasses = List.of( - GENERIC_STREAM, - SPECIFIC_STREAM - ); - - public static List<String> deprecatedAdapterClasses = - Stream.concat( - deprecatedAdapterSetClasses.stream(), - deprecatedAdapterStreamClasses.stream()) - .collect(Collectors.toList()); - - public static boolean shouldMigrate(String adapterClassName) { - return deprecatedAdapterClasses.contains(adapterClassName); - } - - public static boolean isSetAdapter(String adapterClassName) { - return AdapterModels.deprecatedAdapterSetClasses.contains(adapterClassName); - } -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/DocumentKeys.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/DocumentKeys.java deleted file mode 100644 index 757db62991..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/DocumentKeys.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.utils; - -public class DocumentKeys { - - public static final String INTERNAL_NAME = "internalName"; - public static final String ALTERNATIVES = "alternatives"; -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/FormatIds.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/FormatIds.java deleted file mode 100644 index 29bce717f4..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/FormatIds.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.utils; - -public class FormatIds { - public static final String JSON = "Json"; - public static final String IMAGE = "Image"; - public static final String XML = "XML"; - public static final String CSV = "CSV"; - public static final String JSON_OBJECT_FORMAT_ID = "https://streampipes.org/vocabulary/v1/format/json/object"; - public static final String JSON_OBJECT_NEW_KEY = "object"; - public static final String JSON_ARRAY_KEY_FORMAT_ID = "https://streampipes.org/vocabulary/v1/format/json/arraykey"; - public static final String JSON_ARRAY_KEY_NEW_KEY = "arrayField"; - public static final String JSON_ARRAY_NO_KEY_FORMAT_ID = - "https://streampipes.org/vocabulary/v1/format/json/arraynokey"; - public static final String JSON_ARRAY_NO_KEY_NEW_KEY = "array"; - public static final String IMAGE_FORMAT_ID = "https://streampipes.org/vocabulary/v1/format/image"; - public static final String GEOJSON_FORMAT_ID = "https://streampipes.org/vocabulary/v1/format/geojson"; - public static final String GEOJSON_NEW_KEY = "geojson"; - public static final String CSV_FORMAT_ID = "https://streampipes.org/vocabulary/v1/format/csv"; - public static final String XML_FORMAT_ID = "https://streampipes.org/vocabulary/v1/format/xml"; -} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/GenericAdapterUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/GenericAdapterUtils.java deleted file mode 100644 index b4aed57cd4..0000000000 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/GenericAdapterUtils.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.model.connect.adapter.migration.utils; - -import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; -import org.apache.streampipes.model.connect.adapter.migration.format.FormatMigrator; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; - -import static org.apache.streampipes.model.connect.adapter.migration.utils.DocumentKeys.INTERNAL_NAME; - -public class GenericAdapterUtils { - - public static void applyFormat(String formatType, - JsonObject formatConfig, - FormatMigrator migrator) { - - formatConfig - .get(MigrationHelpers.PROPERTIES).getAsJsonObject() - .get(DocumentKeys.ALTERNATIVES).getAsJsonArray() - .forEach(el -> { - var alternative = el.getAsJsonObject(); - if (alternative.get(INTERNAL_NAME).getAsString().equals(formatType)) { - alternative.add("selected", new JsonPrimitive(true)); - migrator.migrate(alternative.get("staticProperty").getAsJsonObject()); - } - }); - } - - public static JsonObject getFormatTemplate() { - return JsonParser.parseString(""" - { - "type": "org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives", - "properties": { - "alternatives": [ - { - "elementId": "sp:staticpropertyalternative:NhVXLH", - "selected": false, - "staticProperty": { - "type": "org.apache.streampipes.model.staticproperty.StaticPropertyGroup", - "properties": { - "staticProperties": [ - { - "type": "org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives", - "properties": { - "alternatives": [ - { - "elementId": "sp:staticpropertyalternative:XbWYtr", - "selected": false, - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternative", - "index": 0, - "label": "Single Object", - "description": "object", - "internalName": "object", - "predefined": false - }, - { - "elementId": "sp:staticpropertyalternative:yFmmae", - "selected": false, - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternative", - "index": 1, - "label": "Array", - "description": "array", - "internalName": "array", - "predefined": false - }, - { - "elementId": "sp:staticpropertyalternative:XDFNhI", - "selected": false, - "staticProperty": { - "type": "org.apache.streampipes.model.staticproperty.StaticPropertyGroup", - "properties": { - "staticProperties": [ - { - "type": "org.apache.streampipes.model.staticproperty.FreeTextStaticProperty", - "properties": { - "requiredDatatype": "http://www.w3.org/2001/XMLSchema#string", - "multiLine": false, - "htmlAllowed": false, - "htmlFontFormat": false, - "placeholdersSupported": false, - "valueRequired": false, - "staticPropertyType": "FreeTextStaticProperty", - "index": 0, - "label": "Key", - "description": "Key of the array within the Json object", - "internalName": "key", - "predefined": false - } - } - ], - "horizontalRendering": false, - "valueRequired": false, - "staticPropertyType": "StaticPropertyGroup", - "index": 0, - "label": "Delimiter", - "description": "", - "internalName": "arrayFieldConfig", - "predefined": false - } - }, - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternative", - "index": 2, - "label": "Array Field", - "description": "arrayField", - "internalName": "arrayField", - "predefined": false - }, - { - "elementId": "sp:staticpropertyalternative:wVjlmK", - "selected": false, - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternative", - "index": 3, - "label": "GeoJSON", - "description": "geojson", - "internalName": "geojson", - "predefined": false - } - ], - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternatives", - "index": 0, - "label": "", - "description": "", - "internalName": "json_options", - "predefined": false - } - } - ], - "horizontalRendering": false, - "valueRequired": false, - "staticPropertyType": "StaticPropertyGroup", - "index": 0, - "label": "Json", - "description": "", - "internalName": "org.apache.streampipes.extensions.management.connect.adapter.parser.json", - "predefined": false - } - }, - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternative", - "index": 0, - "label": "Json", - "description": "", - "internalName": "Json", - "predefined": false - }, - { - "elementId": "sp:staticpropertyalternative:HzpRNt", - "selected": false, - "staticProperty": { - "type": "org.apache.streampipes.model.staticproperty.StaticPropertyGroup", - "properties": { - "staticProperties": [ - { - "type": "org.apache.streampipes.model.staticproperty.FreeTextStaticProperty", - "properties": { - "requiredDomainProperty": "http://www.w3.org/2001/XMLSchema#string", - "multiLine": false, - "htmlAllowed": false, - "htmlFontFormat": false, - "placeholdersSupported": false, - "valueRequired": false, - "staticPropertyType": "FreeTextStaticProperty", - "index": 0, - "label": "Delimiter", - "description": "The delimiter for json. Mostly either , or ;", - "internalName": "delimiter", - "predefined": false - } - }, - { - "type": "org.apache.streampipes.model.staticproperty.AnyStaticProperty", - "properties": { - "options": [ - { - "elementId": "sp:option:fAGcpO", - "name": "Header", - "selected": false, - "internalName": "Header" - } - ], - "horizontalRendering": false, - "valueRequired": false, - "staticPropertyType": "AnyStaticProperty", - "index": 1, - "label": "Header", - "description": "Does the CSV file include a header or not", - "internalName": "header", - "predefined": false - } - } - ], - "horizontalRendering": false, - "valueRequired": false, - "staticPropertyType": "StaticPropertyGroup", - "index": 0, - "label": "CSV", - "description": "Can be used to read CSV", - "internalName": "org.apache.streampipes.extensions.management.connect.adapter.parser.csv", - "predefined": false - } - }, - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternative", - "index": 1, - "label": "CSV", - "description": "Can be used to read CSV", - "internalName": "CSV", - "predefined": false - }, - { - "elementId": "sp:staticpropertyalternative:eSsRuI", - "selected": false, - "staticProperty": { - "type": "org.apache.streampipes.model.staticproperty.StaticPropertyGroup", - "properties": { - "staticProperties": [ - { - "type": "org.apache.streampipes.model.staticproperty.FreeTextStaticProperty", - "properties": { - "requiredDomainProperty": "http://www.w3.org/2001/XMLSchema#string", - "multiLine": false, - "htmlAllowed": false, - "htmlFontFormat": false, - "placeholdersSupported": false, - "valueRequired": false, - "staticPropertyType": "FreeTextStaticProperty", - "index": 0, - "label": "Tag", - "description": "Information in the tag is transformed into an event", - "internalName": "tag", - "predefined": false - } - } - ], - "horizontalRendering": false, - "valueRequired": false, - "staticPropertyType": "StaticPropertyGroup", - "index": 0, - "label": "XML", - "description": "Can be used to read XML data", - "internalName": "org.apache.streampipes.extensions.management.connect.adapter.parser.xml", - "predefined": false - } - }, - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternative", - "index": 2, - "label": "XML", - "description": "Can be used to read XML data", - "internalName": "XML", - "predefined": false - }, - { - "elementId": "sp:staticpropertyalternative:kjRhJe", - "selected": false, - "staticProperty": { - "type": "org.apache.streampipes.model.staticproperty.StaticPropertyGroup", - "properties": { - "staticProperties": [], - "horizontalRendering": false, - "valueRequired": false, - "staticPropertyType": "StaticPropertyGroup", - "index": 0, - "label": "Image", - "description": "Processes images and transforms them into events", - "internalName": "org.apache.streampipes.extensions.management.connect.adapter.parser.image", - "predefined": false - } - }, - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternative", - "index": 3, - "label": "Image", - "description": "Processes images and transforms them into events", - "internalName": "Image", - "predefined": false - } - ], - "valueRequired": false, - "staticPropertyType": "StaticPropertyAlternatives", - "index": 0, - "label": "Format", - "description": "Select the format that is used to parse the events", - "internalName": "format", - "predefined": false - } - }""").getAsJsonObject(); - } -} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java index 1df07fd3df..e5a0976678 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java @@ -22,7 +22,6 @@ package org.apache.streampipes.service.core.migrations; import org.apache.streampipes.service.core.migrations.v070.CreateAssetLinkTypeMigration; import org.apache.streampipes.service.core.migrations.v070.CreateFileAssetTypeMigration; import org.apache.streampipes.service.core.migrations.v090.UpdateUsernameViewMigration; -import org.apache.streampipes.service.core.migrations.v093.AdapterMigration; import org.apache.streampipes.service.core.migrations.v093.StoreEmailTemplatesMigration; import org.apache.streampipes.service.core.migrations.v095.MergeFilenamesAndRenameDuplicatesMigration; import org.apache.streampipes.service.core.migrations.v0980.AddDataLakeMeasureViewMigration; @@ -56,7 +55,6 @@ public class AvailableMigrations { new CreateAssetLinkTypeMigration(), new CreateFileAssetTypeMigration(), new UpdateUsernameViewMigration(), - new AdapterMigration(), new StoreEmailTemplatesMigration(), new MergeFilenamesAndRenameDuplicatesMigration(), new AddLinkSettingsMigration(), diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterBackupWriter.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterBackupWriter.java deleted file mode 100644 index a03d479a8a..0000000000 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterBackupWriter.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.service.core.migrations.v093; - -import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; - -import com.google.gson.JsonObject; -import org.lightcouch.CouchDbClient; -import org.lightcouch.NoDocumentException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AdapterBackupWriter { - - private static final Logger LOG = LoggerFactory.getLogger(AdapterBackupWriter.class); - private static final String REV = "_rev"; - - private final MigrationHelpers helpers; - - private final CouchDbClient couchDbClient; - - public AdapterBackupWriter(CouchDbClient couchDbClient, - MigrationHelpers migrationHelpers) { - this.couchDbClient = couchDbClient; - this.helpers = migrationHelpers; - } - - public void writeBackup(JsonObject jsonObject) { - var docId = helpers.getDocId(jsonObject); - if (!isPresent(docId)) { - jsonObject.add("rev_backup", jsonObject.get(REV)); - jsonObject.remove(REV); - this.couchDbClient.save(jsonObject); - } else { - LOG.warn("Skipping backup of document with id {} since it already exists", docId); - } - } - - private boolean isPresent(String docId) { - try { - couchDbClient.find(JsonObject.class, docId); - return true; - } catch (NoDocumentException e) { - return false; - } - } -} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java deleted file mode 100644 index 0e488db656..0000000000 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.service.core.migrations.v093; - -import org.apache.streampipes.manager.migration.AdapterDescriptionMigration093Provider; -import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; -import org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels; -import org.apache.streampipes.service.core.migrations.Migration; -import org.apache.streampipes.service.core.migrations.v093.migrator.AdapterMigrator; -import org.apache.streampipes.service.core.migrations.v093.migrator.GenericAdapterMigrator; -import org.apache.streampipes.service.core.migrations.v093.migrator.SpecificAdapterMigrator; -import org.apache.streampipes.storage.couchdb.utils.Utils; - -import com.google.gson.JsonObject; -import org.lightcouch.CouchDbClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -import static org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels.GENERIC_STREAM; -import static org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels.isSetAdapter; - -public class AdapterMigration implements Migration { - - private static final Logger LOG = LoggerFactory.getLogger(AdapterMigration.class); - - private static final String ROWS = "rows"; - private final CouchDbClient adapterInstanceClient; - private final CouchDbClient adapterDescriptionClient; - private final List<JsonObject> adaptersToMigrate; - private final List<JsonObject> adapterDescriptionsToRemove; - - private final MigrationHelpers helpers; - - - public AdapterMigration() { - this.adapterInstanceClient = Utils.getCouchDbAdapterInstanceClient(); - this.adapterDescriptionClient = Utils.getCouchDbAdapterDescriptionClient(); - this.adaptersToMigrate = new ArrayList<>(); - this.adapterDescriptionsToRemove = new ArrayList<>(); - this.helpers = new MigrationHelpers(); - } - - @Override - public boolean shouldExecute() { - var adapterInstanceUri = getAllDocsUri(adapterInstanceClient); - var adapterDescriptionUri = getAllDocsUri(adapterDescriptionClient); - - findDocsToMigrate(adapterInstanceClient, adapterInstanceUri, adaptersToMigrate); - findDocsToMigrate(adapterDescriptionClient, adapterDescriptionUri, adapterDescriptionsToRemove); - - return !adaptersToMigrate.isEmpty() || !adapterDescriptionsToRemove.isEmpty(); - } - - private void findDocsToMigrate(CouchDbClient adapterClient, - String uri, - List<JsonObject> collector) { - var existingAdapters = adapterClient.findAny(JsonObject.class, uri); - if (existingAdapters.size() > 0 && existingAdapters.has(ROWS)) { - var rows = existingAdapters.get(ROWS); - rows.getAsJsonArray().forEach(row -> { - var doc = row.getAsJsonObject().get("doc").getAsJsonObject(); - var docType = doc.get("type").getAsString(); - if (AdapterModels.shouldMigrate(docType)) { - collector.add(doc); - } - }); - } - } - - @Override - public void executeMigration() { - var adapterInstanceBackupClient = Utils.getCouchDbAdapterInstanceBackupClient(); - - LOG.info("Deleting {} adapter descriptions, which will be regenerated after migration", - adapterDescriptionsToRemove.size()); - - adapterDescriptionsToRemove.forEach(ad -> { - String docId = helpers.getDocId(ad); - var adapterType = ad.get("type").getAsString(); - String rev = helpers.getRev(ad); - String appId = helpers.getAppId(ad); - if (!isSetAdapter(adapterType)) { - AdapterDescriptionMigration093Provider.INSTANCE.addAppId(appId); - } - if (docId != null && rev != null) { - adapterDescriptionClient.remove(docId, rev); - } - }); - - LOG.info("Migrating {} adapter models", adaptersToMigrate.size()); - - LOG.info("Performing backup of old models to database adapterinstance_backup"); - - adaptersToMigrate.forEach(adapter -> { - // Is required to keep the _rev field for the original object. This field must be removed for the backup - var copyAdapter = adapter.deepCopy(); - new AdapterBackupWriter(adapterInstanceBackupClient, new MigrationHelpers()).writeBackup(copyAdapter); - }); - - LOG.info("Performing migration of adapters"); - - adaptersToMigrate.forEach(adapter -> { - var adapterType = adapter.get("type").getAsString(); - if (AdapterModels.isSetAdapter(adapterType)) { - LOG.warn("Data Set adapters are no longer supported and can't be migrated - consult docs for an alternative"); - } else { - getAdapterMigrator(adapterType).migrate(adapterInstanceClient, adapter); - } - }); - - LOG.info("Adapter migration finished"); - } - - @Override - public String getDescription() { - return "Migrate all adapters to new data model"; - } - - private String getAllDocsUri(CouchDbClient client) { - return client.getDBUri().toString() + "_all_docs" + "?include_docs=true"; - } - - private AdapterMigrator getAdapterMigrator(String adapterType) { - if (adapterType.equals(GENERIC_STREAM)) { - return new GenericAdapterMigrator(new MigrationHelpers()); - } else { - return new SpecificAdapterMigrator(new MigrationHelpers()); - } - } -} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/AdapterMigrator.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/AdapterMigrator.java deleted file mode 100644 index c1be902536..0000000000 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/AdapterMigrator.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.service.core.migrations.v093.migrator; - -import com.google.gson.JsonObject; -import org.lightcouch.CouchDbClient; - -public interface AdapterMigrator { - - void migrate(CouchDbClient couchDbClient, JsonObject adapter); -} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/GenericAdapterMigrator.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/GenericAdapterMigrator.java deleted file mode 100644 index a5be002fa3..0000000000 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/GenericAdapterMigrator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.service.core.migrations.v093.migrator; - -import org.apache.streampipes.model.connect.adapter.migration.GenericAdapterConverter; -import org.apache.streampipes.model.connect.adapter.migration.IAdapterConverter; -import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; - -import com.google.gson.JsonObject; -import org.lightcouch.CouchDbClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class GenericAdapterMigrator implements AdapterMigrator { - - private static final Logger LOG = LoggerFactory.getLogger(GenericAdapterMigrator.class); - - private final MigrationHelpers helpers; - private final IAdapterConverter converter; - - public GenericAdapterMigrator(MigrationHelpers helpers) { - this.helpers = helpers; - this.converter = new GenericAdapterConverter(false); - } - - @Override - public void migrate(CouchDbClient couchDbClient, JsonObject adapter) { - var adapterName = helpers.getAdapterName(adapter); - var convertedAdapter = converter.convert(adapter); - - couchDbClient.update(convertedAdapter); - - LOG.info("Successfully migrated adapter {}", adapterName); - } -} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/SpecificAdapterMigrator.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/SpecificAdapterMigrator.java deleted file mode 100644 index 7c77c2f253..0000000000 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/SpecificAdapterMigrator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.service.core.migrations.v093.migrator; - -import org.apache.streampipes.model.connect.adapter.migration.IAdapterConverter; -import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; -import org.apache.streampipes.model.connect.adapter.migration.SpecificAdapterConverter; - -import com.google.gson.JsonObject; -import org.lightcouch.CouchDbClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SpecificAdapterMigrator implements AdapterMigrator { - - private static final Logger LOG = LoggerFactory.getLogger(SpecificAdapterMigrator.class); - private final MigrationHelpers helpers; - private final IAdapterConverter converter; - - public SpecificAdapterMigrator(MigrationHelpers helpers) { - this.helpers = helpers; - this.converter = new SpecificAdapterConverter(false); - } - - @Override - public void migrate(CouchDbClient couchDbClient, - JsonObject adapter) { - var adapterName = helpers.getAdapterName(adapter); - var convertedAdapter = converter.convert(adapter); - - LOG.info("Start migrating adapter {}", adapterName); - - couchDbClient.update(convertedAdapter); - - LOG.info("Successfully migrated adapter {}", adapterName); - } -} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/MigrateAdaptersToUseScript.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/MigrateAdaptersToUseScript.java index 273075d7ac..d71ce4752f 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/MigrateAdaptersToUseScript.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/MigrateAdaptersToUseScript.java @@ -16,29 +16,15 @@ * */ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - package org.apache.streampipes.service.core.migrations.v099.connect; import org.apache.streampipes.model.connect.TransformationConfig; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.rules.TransformationRuleDescription; +import org.apache.streampipes.model.staticproperty.StaticProperty; +import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative; +import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; +import org.apache.streampipes.model.staticproperty.StaticPropertyGroup; import org.apache.streampipes.service.core.migrations.Migration; import org.apache.streampipes.storage.api.IAdapterStorage; import org.apache.streampipes.storage.management.StorageDispatcher; @@ -51,6 +37,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; +import java.util.Optional; public class MigrateAdaptersToUseScript implements Migration { @@ -71,13 +58,12 @@ public class MigrateAdaptersToUseScript implements Migration { } @Override - // Execute if there is at least one adapter with rules defined + // Execute if there is an adapter with no transformation config or script public boolean shouldExecute() { List<AdapterDescription> adapters = adapterStorage.findAll(); - return adapters != null - && adapters.stream() - .anyMatch(adapter -> adapter.getTransformationConfig() == null - || adapter.getTransformationConfig().getScript() == null); + return adapters != null && adapters.stream() + .anyMatch(adapter -> adapter.getTransformationConfig() == null || adapter.getTransformationConfig() + .getScript() == null); } @Override @@ -94,11 +80,12 @@ public class MigrateAdaptersToUseScript implements Migration { private void migrateAndUpdateAdapter(AdapterDescription adapterDescription) { LOG.info("Migrating adapter to script preprosessing: {}", adapterDescription.getName()); - migrateAdapter(adapterDescription); + removeDeprecatedParserConfig(adapterDescription); + migrateAdapterToUseTransformationScript(adapterDescription); updateAdapter(adapterDescription); } - private void migrateAdapter(AdapterDescription adapter) { + private void migrateAdapterToUseTransformationScript(AdapterDescription adapter) { removeAdditionalMetadata(adapter); @@ -146,6 +133,73 @@ public class MigrateAdaptersToUseScript implements Migration { return config; } + /** + * Removes deprecated parser configuration options from the adapter's "format" -> "Json" configuration. + * <p> + * Behavior: + * - If config / format / Json option is missing: returns null + * - Renames the "object" alternative label to "Object" + * - Removes the "geojson" alternative + */ + private void removeDeprecatedParserConfig(AdapterDescription adapter) { + if (adapter == null || adapter.getConfig() == null || adapter.getConfig() + .isEmpty()) { + return; + } + + var formatOption = findFormatOption(adapter); + if (formatOption.isEmpty()) { + return; + } + + var jsonAlternative = findJsonAlternative(formatOption.get()); + if (jsonAlternative.isEmpty()) { + return; + } + + var alternatives = extractJsonAlternatives(jsonAlternative.get()); + + renameObjectLabel(alternatives); + removeByInternalName(alternatives, "geojson"); + } + + private Optional<StaticPropertyAlternatives> findFormatOption(AdapterDescription adapter) { + return adapter.getConfig() + .stream() + .filter(c -> "format".equals(c.getInternalName())) + .map(StaticPropertyAlternatives.class::cast) + .findFirst(); + } + + private Optional<StaticPropertyAlternative> findJsonAlternative(StaticPropertyAlternatives formatOption) { + return formatOption.getAlternatives() + .stream() + .filter(a -> "Json".equals(a.getInternalName())) + .findFirst(); + } + + private List<StaticPropertyAlternative> extractJsonAlternatives(StaticPropertyAlternative jsonAlternative) { + StaticPropertyGroup group = (StaticPropertyGroup) jsonAlternative.getStaticProperty(); + + // Preserve original behavior: assume index 0 exists and is StaticPropertyAlternatives + StaticProperty first = group.getStaticProperties() + .get(0); + + return ((StaticPropertyAlternatives) first).getAlternatives(); + } + + private void renameObjectLabel(List<StaticPropertyAlternative> alternatives) { + alternatives.stream() + .filter(a -> "object".equals(a.getInternalName())) + .forEach(a -> a.setLabel("Object")); + } + + private void removeByInternalName(List<StaticPropertyAlternative> alternatives, String internalName) { + // Avoid ConcurrentModification: original used stream().toList() then remove; removeIf is clearer. + alternatives.removeIf(a -> internalName.equals(a.getInternalName())); + } + + private void updateAdapter(AdapterDescription adapterDescription) { adapterStorage.updateElement(adapterDescription); } diff --git a/ui/cypress/fixtures/connect/format/geoJson.json b/ui/cypress/fixtures/connect/format/geoJson.json deleted file mode 100644 index 97e202ba70..0000000000 --- a/ui/cypress/fixtures/connect/format/geoJson.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "type": "Feature", - "geometry": { - "type": "Point", - "coordinates": [125.6, 10.1] - }, - "properties": { - "timestamp": 1667904471000, - "v1": 4.1 - } -} diff --git a/ui/cypress/support/builder/AdapterBuilder.ts b/ui/cypress/support/builder/AdapterBuilder.ts index bba9e303bd..0f9fd9f645 100644 --- a/ui/cypress/support/builder/AdapterBuilder.ts +++ b/ui/cypress/support/builder/AdapterBuilder.ts @@ -73,6 +73,11 @@ export class AdapterBuilder { return this; } + public setScript(script: string) { + this.adapterInput.script = script; + return this; + } + public addTreeNode(treeNode: TreeNodeUserInputBuilder) { const userInput = new UserInput(); userInput.type = 'tree'; diff --git a/ui/cypress/support/model/AdapterInput.ts b/ui/cypress/support/model/AdapterInput.ts index 0340758802..e687c4aea6 100644 --- a/ui/cypress/support/model/AdapterInput.ts +++ b/ui/cypress/support/model/AdapterInput.ts @@ -35,4 +35,6 @@ export class AdapterInput { formatConfiguration: UserInput[]; dataTypeChanges: PropertyDataTypeChange[] = []; + + script: string; } diff --git a/ui/cypress/support/utils/connect/ConnectBtns.ts b/ui/cypress/support/utils/connect/ConnectBtns.ts index 8c18efed29..9caf3610e6 100644 --- a/ui/cypress/support/utils/connect/ConnectBtns.ts +++ b/ui/cypress/support/utils/connect/ConnectBtns.ts @@ -272,6 +272,12 @@ export class ConnectBtns { }); } + public static configureFieldsEventPreviewResult() { + return cy.dataCy('configure-fields-event-preview-result', { + timeout: 10000, + }); + } + public static configureFieldsNextBtn() { return cy.dataCy('configure-fields-next-button'); } diff --git a/ui/cypress/tests/connect/formats/format.spec.ts b/ui/cypress/tests/connect/formats/format.spec.ts index 9e0de6905b..d60f6f7d91 100644 --- a/ui/cypress/tests/connect/formats/format.spec.ts +++ b/ui/cypress/tests/connect/formats/format.spec.ts @@ -44,7 +44,7 @@ describe('Test adapter formats', () => { template .setFormat('json') - .addFormatInput('radio', 'json_options-single_object', ''); + .addFormatInput('radio', 'json_options-object', ''); createAdapterUntilEventSchemaConfiguration(template.build()); @@ -83,27 +83,6 @@ describe('Test adapter formats', () => { validateResult(expected); }); - it('Test geo json format', () => { - // Set up test - const geoJsonResultEvent = { - latitude: 10.1, - longitude: 125.6, - timestamp: 1667904471000, - v1: 4.1, - }; - FileManagementUtils.addFile(baseDir + 'geoJson.json'); - const template = makeAdapterInputTemplate(); - - template - .setFormat('json') - .addFormatInput('radio', 'json_options-geojson', ''); - - createAdapterUntilEventSchemaConfiguration(template.build()); - - // Validate result - validateResult(geoJsonResultEvent); - }); - it('Test xml format', () => { // Set up test FileManagementUtils.addFile(baseDir + 'xmlObject.xml'); diff --git a/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts b/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts index 51d149369b..401be131f7 100644 --- a/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts +++ b/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts @@ -54,7 +54,7 @@ describe('Test Kafka Integration', () => { .addProtocolInput('click', 'sp-reload', '') .addProtocolInput('radio', topicName, '') .setFormat('json') - .addFormatInput('radio', 'json_options-single_object', '') + .addFormatInput('radio', 'json_options-object', '') .build(); ThirdPartyIntegrationUtils.runTest(sink, adapter); diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-configuration-state-service/adapter-configuration-state.service.ts b/ui/src/app/connect/components/adapter-configuration/adapter-configuration-state-service/adapter-configuration-state.service.ts index 5ce1dbcb39..2f73ddae3d 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-configuration-state-service/adapter-configuration-state.service.ts +++ b/ui/src/app/connect/components/adapter-configuration/adapter-configuration-state-service/adapter-configuration-state.service.ts @@ -68,7 +68,7 @@ export class AdapterConfigurationStateService { isGettingEventSchema: false, getEventSchemaError: null, isPreviewLoading: false, - resultPreview: {}, + resultPreview: null, }; private _state = signal<AdapterConfigurationState>(this.initialState);
