This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch SP-1085 in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 02059a87d314191368147d2d4d38d19d70abb599 Author: Philipp Zehnder <[email protected]> AuthorDate: Fri Jan 13 14:12:23 2023 +0100 [#1085] Extract find timestamp property from FileStreamProtocol to EventSchemaUtils --- streampipes-extensions-management/pom.xml | 6 ++++ .../connect/adapter/AdapterPipelineGenerator.java | 9 ++++-- .../adapter/model/generic/GenericAdapter.java | 4 --- .../pipeline/AdapterEventPreviewPipeline.java | 6 ++++ .../adapter/model/pipeline/AdapterPipeline.java | 8 +++-- .../management/util/EventSchemaUtils.java | 29 ++---------------- .../management/util/EventSchemaUtilsTest.java | 18 ++++-------- .../iiot/protocol/stream/FileStreamProtocol.java | 34 ++++------------------ streampipes-model/pom.xml | 5 ---- .../apache/streampipes/model/util/SchemaUtils.java | 34 ---------------------- 10 files changed, 36 insertions(+), 117 deletions(-) diff --git a/streampipes-extensions-management/pom.xml b/streampipes-extensions-management/pom.xml index 883d30fb4..0b6c9e15d 100644 --- a/streampipes-extensions-management/pom.xml +++ b/streampipes-extensions-management/pom.xml @@ -96,6 +96,12 @@ <artifactId>streampipes-service-discovery</artifactId> <version>0.91.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-test-utils</artifactId> + <version>0.91.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.streampipes</groupId> <artifactId>streampipes-vocabulary</artifactId> diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java index c344466bc..32a913f99 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java @@ -74,12 +74,15 @@ public class AdapterPipelineGenerator { if (adapterDescription.getEventGrounding() != null && adapterDescription.getEventGrounding().getTransportProtocol() != null && adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) { - return new AdapterPipeline(pipelineElements, getAdapterSink(adapterDescription)); + return new AdapterPipeline( + pipelineElements, + getAdapterSink(adapterDescription), + adapterDescription.getEventSchema()); } DebugSinkRuleDescription debugSinkRuleDescription = getDebugRule(adapterDescription.getRules()); if (debugSinkRuleDescription != null) { - return new AdapterPipeline(pipelineElements, new DebugAdapterSink()); + return new AdapterPipeline(pipelineElements, new DebugAdapterSink(), adapterDescription.getEventSchema()); } return new AdapterPipeline(pipelineElements, adapterDescription.getEventSchema()); @@ -176,7 +179,7 @@ public class AdapterPipelineGenerator { } private boolean isPrioritized(SpProtocol prioritizedProtocol, - Class<?> protocolClass) { + Class<?> protocolClass) { return prioritizedProtocol.getProtocolClass().equals(protocolClass.getCanonicalName()); } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java index 54f16fac4..fa8f864f0 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/generic/GenericAdapter.java @@ -67,10 +67,6 @@ public abstract class GenericAdapter<T extends AdapterDescription> extends Adapt IProtocol protocolInstance = this.protocol.getInstance(protocolDescription, parser, format); this.protocol = protocolInstance; - //TODO remove -// EventSchema eventSchema = adapterDescription.getEventSchema(); -// this.protocol.setEventSchema(eventSchema); - logger.debug("Start adatper with format: " + format.getId() + " and " + protocol.getId()); protocolInstance.run(adapterPipeline); diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java index 3367d2155..17a70bfcd 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterEventPreviewPipeline.java @@ -24,6 +24,7 @@ import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement; import org.apache.streampipes.extensions.management.connect.adapter.AdapterPipelineGenerator; import org.apache.streampipes.model.connect.guess.AdapterEventPreview; import org.apache.streampipes.model.connect.guess.GuessTypeInfo; +import org.apache.streampipes.model.schema.EventSchema; import java.util.List; import java.util.Map; @@ -79,4 +80,9 @@ public class AdapterEventPreviewPipeline implements IAdapterPipeline { .collect(Collectors.toMap(Map.Entry::getKey, e -> new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue()))); } + + @Override + public EventSchema getResultingEventSchema() { + return null; + } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java index d1a0bc1c6..7edd585ec 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java @@ -32,14 +32,18 @@ public class AdapterPipeline implements IAdapterPipeline { private EventSchema resultingEventSchema; - public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, EventSchema resultingEventSchema) { + public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, + EventSchema resultingEventSchema) { this.pipelineElements = pipelineElements; this.resultingEventSchema = resultingEventSchema; } - public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterPipelineElement pipelineSink) { + public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, + IAdapterPipelineElement pipelineSink, + EventSchema resultingEventSchema) { this.pipelineElements = pipelineElements; this.pipelineSink = pipelineSink; + this.resultingEventSchema = resultingEventSchema; } @Override diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java similarity index 74% copy from streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java copy to streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java index ba8a15587..f0a702044 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/util/EventSchemaUtils.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.model.util; +package org.apache.streampipes.extensions.management.util; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventPropertyNested; @@ -25,35 +25,10 @@ import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.vocabulary.SO; import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; -public class SchemaUtils { - - public static Map<String, Object> toRuntimeMap(List<EventProperty> eps) { - return toUntypedRuntimeMap(eps); - } - - public static Map<String, Object> toUntypedRuntimeMap(List<EventProperty> eps) { - Map<String, Object> propertyMap = new HashMap<>(); - - for (EventProperty p : eps) { - propertyMap.putAll(PropertyUtils.getUntypedRuntimeFormat(p)); - } - return propertyMap; - } - - public static List<String> toPropertyList(List<EventProperty> eps) { - List<String> properties = new ArrayList<>(); - - for (EventProperty p : eps) { - properties.addAll(PropertyUtils.getFullPropertyName(p, "")); - } - return properties; - } +public class EventSchemaUtils { /** * Returns the timestamp property of an event schema as an {@code Optional}. diff --git a/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java similarity index 81% rename from streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java rename to streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java index 4dd3bb5bc..58797f97f 100644 --- a/streampipes-model/src/test/java/org/apache/streampipes/model/util/SchemaUtilsTest.java +++ b/streampipes-extensions-management/src/test/java/org/apache/streampipes/extensions/management/util/EventSchemaUtilsTest.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.model.util; +package org.apache.streampipes.extensions.management.util; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.test.generator.EventPropertyNestedTestBuilder; @@ -30,8 +30,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; - -public class SchemaUtilsTest { +public class EventSchemaUtilsTest { EventProperty timestampProperty = EventPropertyPrimitiveTestBuilder.create() .withSemanticType(SO.DATE_TIME) @@ -46,7 +45,7 @@ public class SchemaUtilsTest { EventPropertyPrimitiveTestBuilder.create().build()) .build(); - var result = SchemaUtils.getTimestampProperty(eventSchema); + var result = EventSchemaUtils.getTimestampProperty(eventSchema); assertFalse(result.isPresent()); } @@ -57,12 +56,7 @@ public class SchemaUtilsTest { .withEventProperty(timestampProperty) .build(); -// var timestampProperty = new EventPropertyPrimitive(); -// timestampProperty.setDomainProperties(List.of(URI.create(SO.DATE_TIME))); -// timestampProperty.setRuntimeName("timestamp"); -// eventSchema.addEventProperty(timestampProperty); - - var result = SchemaUtils.getTimestampProperty(eventSchema); + var result = EventSchemaUtils.getTimestampProperty(eventSchema); assertTrue(result.isPresent()); assertEquals(result.get(), timestampProperty); @@ -78,11 +72,9 @@ public class SchemaUtilsTest { .build()) .build(); - var result = SchemaUtils.getTimestampProperty(eventSchema); + var result = EventSchemaUtils.getTimestampProperty(eventSchema); assertTrue(result.isPresent()); assertEquals(result.get(), timestampProperty); } - - } \ No newline at end of file diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java index 974265bc0..a07813dbb 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java @@ -30,13 +30,10 @@ import org.apache.streampipes.extensions.management.connect.adapter.preprocessin import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToJmsAdapterSink; import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink; import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToMqttAdapterSink; +import org.apache.streampipes.extensions.management.util.EventSchemaUtils; import org.apache.streampipes.model.AdapterType; import org.apache.streampipes.model.connect.grounding.ProtocolDescription; import org.apache.streampipes.model.connect.guess.GuessSchema; -import org.apache.streampipes.model.schema.EventProperty; -import org.apache.streampipes.model.schema.EventPropertyList; -import org.apache.streampipes.model.schema.EventPropertyNested; -import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder; import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor; @@ -53,7 +50,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; public class FileStreamProtocol extends Protocol { @@ -85,7 +81,7 @@ public class FileStreamProtocol extends Protocol { @Override public void run(IAdapterPipeline adapterPipeline) { - String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema().getEventProperties(), ""); + String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema()); // exchange adapter pipeline sink with special purpose replay sink for file replay if (adapterPipeline.getPipelineSink() instanceof SendToKafkaAdapterSink) { @@ -172,29 +168,9 @@ public class FileStreamProtocol extends Protocol { return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay); } - private String getTimestampKey(List<EventProperty> eventProperties, String prefixKey) { - String result = null; - for (EventProperty eventProperty : eventProperties) { - if (eventProperty instanceof EventPropertyPrimitive && eventProperty.getDomainProperties() != null) { - for (int i = eventProperty.getDomainProperties().size() - 1; i >= 0; i--) { - if (eventProperty.getDomainProperties().get(0).toString().equals("http://schema.org/DateTime")) { - result = prefixKey + eventProperty.getRuntimeName(); - } - } - } else if (eventProperty instanceof EventPropertyNested - && ((EventPropertyNested) eventProperty).getEventProperties() != null) { - result = getTimestampKey(((EventPropertyNested) eventProperty).getEventProperties(), - prefixKey + eventProperty.getRuntimeName() + "."); - } else if (eventProperty instanceof EventPropertyList - && ((EventPropertyList) eventProperty).getEventProperty() != null) { - result = getTimestampKey(Arrays.asList(((EventPropertyList) eventProperty).getEventProperty()), - prefixKey + eventProperty.getRuntimeName() + "."); - } - if (result != null) { - return result; - } - } - return result; + private String getTimestampKey(EventSchema eventSchema) { + var timestampProperty = EventSchemaUtils.getTimestampProperty(eventSchema); + return timestampProperty.get().getRuntimeName(); } @Override diff --git a/streampipes-model/pom.xml b/streampipes-model/pom.xml index 994507656..40bb1ca79 100644 --- a/streampipes-model/pom.xml +++ b/streampipes-model/pom.xml @@ -38,11 +38,6 @@ <artifactId>streampipes-model-shared</artifactId> <version>0.91.0-SNAPSHOT</version> </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-test-utils</artifactId> - <version>0.91.0-SNAPSHOT</version> - </dependency> <dependency> <groupId>org.apache.streampipes</groupId> <artifactId>streampipes-logging</artifactId> diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java index ba8a15587..48738c143 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/SchemaUtils.java @@ -19,17 +19,11 @@ package org.apache.streampipes.model.util; import org.apache.streampipes.model.schema.EventProperty; -import org.apache.streampipes.model.schema.EventPropertyNested; -import org.apache.streampipes.model.schema.EventPropertyPrimitive; -import org.apache.streampipes.model.schema.EventSchema; -import org.apache.streampipes.vocabulary.SO; -import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; public class SchemaUtils { @@ -55,32 +49,4 @@ public class SchemaUtils { return properties; } - /** - * Returns the timestamp property of an event schema as an {@code Optional}. - * - * <p> The method checks all properties if they are of type {@code EventPropertyPrimitive} and if their domain - * properties contains the uri http://schema.org/DateTime </p> - * - * @param eventSchema the event schema for which the timestamp property is to be returned - * @return an {@code Optional} containing the timestamp property, or an empty {@code Optional} if - * no such property was found - */ - public static Optional<EventPropertyPrimitive> getTimestampProperty(EventSchema eventSchema) { - return getTimstampProperty(eventSchema.getEventProperties()); - } - - - private static Optional<EventPropertyPrimitive> getTimstampProperty(List<EventProperty> eventProperties) { - for (EventProperty ep : eventProperties) { - if (ep instanceof EventPropertyPrimitive && ep.getDomainProperties().contains(URI.create(SO.DATE_TIME))) { - return Optional.of((EventPropertyPrimitive) ep); - } - - if (ep instanceof EventPropertyNested) { - return getTimstampProperty(((EventPropertyNested) ep).getEventProperties()); - } - } - - return Optional.empty(); - } }
