This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new f6265ea420 chore: Provide adapter and pipeline preview as stream
(#3101)
f6265ea420 is described below
commit f6265ea420af8f5e4e9ffa5c370416058a9abc4f
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Aug 8 07:57:00 2024 +0200
chore: Provide adapter and pipeline preview as stream (#3101)
* chore: Provide adapter and pipeline preview as stream
* Cleanup
* Fix checkstyle
* Fix adapter preview
* Add timeout to test
* Fix timeout
* Increase interval
* Load pipeline preview in a single request
* Cleanup
* Add response header to deactive proxy buffering
---
.../messaging/kafka/SpKafkaConsumer.java | 1 -
.../streampipes/manager/operations/Operations.java | 6 --
.../manager/preview/ActivePipelinePreviews.java | 17 +--
.../manager/preview/PipelinePreview.java | 59 ++++++-----
.../runtime/DataStreamRuntimeInfoProvider.java | 94 ++++++++++++++++
.../runtime/PipelineElementRuntimeInfoFetcher.java | 118 ---------------------
.../runtime/RateLimitedRuntimeInfoProvider.java | 64 +++++++++++
.../manager/runtime/SpDataFormatConverter.java | 15 +--
.../rest/impl/PipelineElementPreview.java | 25 +++--
.../rest/impl/PipelineElementRuntimeInfo.java | 26 +++--
ui/cypress/support/utils/connect/ConnectUtils.ts | 9 +-
.../adapter-started-preview.component.html | 2 +-
.../adapter-started-preview.component.ts | 9 +-
ui/src/app/connect/services/rest.service.ts | 9 +-
.../pipeline-element-runtime-info.component.ts | 62 +++++------
.../pipeline-element-preview.component.html | 5 +-
.../pipeline-element-preview.component.scss | 10 +-
.../pipeline-element-preview.component.ts | 54 +++++-----
.../components/pipeline/pipeline.component.html | 4 +-
.../components/pipeline/pipeline.component.ts | 21 +++-
ui/src/app/editor/services/editor.service.ts | 21 ++--
.../live-preview.service.ts} | 35 +++---
22 files changed, 348 insertions(+), 318 deletions(-)
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index e999cfbc4a..fcbd38224f 100644
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -104,7 +104,6 @@ public class SpKafkaConsumer implements EventConsumer,
Runnable,
ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
records.forEach(record -> eventProcessor.onEvent(record.value()));
}
- LOG.info("Closing Kafka Consumer.");
consumer.close();
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 81d6215c88..f2addd2b20 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -20,12 +20,10 @@ package org.apache.streampipes.manager.operations;
import
org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.recommender.ElementRecommender;
import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
-import
org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher;
import org.apache.streampipes.manager.storage.PipelineStorageService;
import org.apache.streampipes.manager.template.PipelineTemplateGenerator;
import
org.apache.streampipes.manager.template.PipelineTemplateInvocationGenerator;
@@ -116,10 +114,6 @@ public class Operations {
return new ContainerProvidedOptionsHandler().fetchRemoteOptions(request);
}
- public static String getRuntimeInfo(SpDataStream spDataStream) throws
SpRuntimeException {
- return
PipelineElementRuntimeInfoFetcher.INSTANCE.getCurrentData(spDataStream);
- }
-
public static List<PipelineTemplateDescription> getAllPipelineTemplates() {
return new PipelineTemplateGenerator().getAllPipelineTemplates();
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/ActivePipelinePreviews.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/ActivePipelinePreviews.java
index 9f7922b357..d32af21c6e 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/ActivePipelinePreviews.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/ActivePipelinePreviews.java
@@ -22,13 +22,12 @@ import
org.apache.streampipes.model.base.NamedStreamPipesEntity;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
public enum ActivePipelinePreviews {
INSTANCE;
- private Map<String, List<NamedStreamPipesEntity>> activePreviews;
+ private final Map<String, List<NamedStreamPipesEntity>> activePreviews;
ActivePipelinePreviews() {
this.activePreviews = new HashMap<>();
@@ -46,18 +45,4 @@ public enum ActivePipelinePreviews {
public List<NamedStreamPipesEntity> getInvocationGraphs(String previewId) {
return this.activePreviews.get(previewId);
}
-
- public Optional<NamedStreamPipesEntity>
getInvocationGraphForPipelineELement(String previewId,
-
String pipelineElementDomId) {
- List<NamedStreamPipesEntity> graphs = this.activePreviews.get(previewId);
-
- if (graphs == null || graphs.size() == 0) {
- return Optional.empty();
- } else {
- return graphs
- .stream()
- .filter(g -> g.getDom().equals(pipelineElementDomId))
- .findFirst();
- }
- }
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 903ecbeb75..6795d6f67c 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -23,7 +23,6 @@ import
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpoi
import org.apache.streampipes.manager.execution.http.DetachHttpRequest;
import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
-import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
@@ -31,25 +30,32 @@ import
org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.preview.PipelinePreviewModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
+import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
public class PipelinePreview {
+ private static final Logger LOG =
LoggerFactory.getLogger(PipelinePreview.class);
+
public PipelinePreviewModel initiatePreview(Pipeline pipeline) {
String previewId = generatePreviewId();
pipeline.setActions(new ArrayList<>());
- List<NamedStreamPipesEntity> pipelineElements = new
PipelineVerificationHandlerV2(pipeline)
- .verifyAndBuildGraphs(true)
- .stream()
- .collect(Collectors.toList());
+ List<NamedStreamPipesEntity> pipelineElements = new ArrayList<>(
+ new PipelineVerificationHandlerV2(pipeline)
+ .verifyAndBuildGraphs(true)
+ );
invokeGraphs(filter(pipelineElements));
storeGraphs(previewId, pipelineElements);
+ LOG.info("Preview pipeline {} started", previewId);
+
return makePreviewModel(previewId, pipelineElements);
}
@@ -57,26 +63,25 @@ public class PipelinePreview {
List<NamedStreamPipesEntity> graphs =
ActivePipelinePreviews.INSTANCE.getInvocationGraphs(previewId);
detachGraphs(filter(graphs));
deleteGraphs(previewId);
+ LOG.info("Preview pipeline {} stopped", previewId);
}
- public String getPipelineElementPreview(String previewId,
- String pipelineElementDomId) throws
IllegalArgumentException {
- Optional<NamedStreamPipesEntity> graphOpt = ActivePipelinePreviews
+ public Map<String, SpDataStream> getPipelineElementPreviewStreams(String
previewId) throws IllegalArgumentException {
+ return ActivePipelinePreviews
.INSTANCE
- .getInvocationGraphForPipelineELement(previewId, pipelineElementDomId);
-
- if (graphOpt.isPresent()) {
- NamedStreamPipesEntity graph = graphOpt.get();
- if (graph instanceof DataProcessorInvocation) {
- return Operations.getRuntimeInfo(((DataProcessorInvocation)
graph).getOutputStream());
- } else if (graph instanceof SpDataStream) {
- return Operations.getRuntimeInfo((SpDataStream) graph);
- } else {
- throw new IllegalArgumentException("Requested pipeline element is not
a data processor");
- }
- } else {
- throw new IllegalArgumentException("Could not find pipeline element");
- }
+ .getInvocationGraphs(previewId)
+ .stream()
+ .filter(graph -> graph instanceof DataProcessorInvocation || graph
instanceof SpDataStream)
+ .collect(Collectors.toMap(
+ NamedStreamPipesEntity::getElementId,
+ graph -> {
+ if (graph instanceof DataProcessorInvocation) {
+ return ((DataProcessorInvocation) graph).getOutputStream();
+ } else {
+ return (SpDataStream) graph;
+ }
+ }
+ ));
}
private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws
NoServiceEndpointsAvailableException {
@@ -93,7 +98,7 @@ public class PipelinePreview {
g.setSelectedEndpointUrl(findSelectedEndpoint(g));
new InvokeHttpRequest().execute(g, g.getSelectedEndpointUrl(), null);
} catch (NoServiceEndpointsAvailableException e) {
- e.printStackTrace();
+ LOG.warn("No endpoint found for pipeline element {}", g.getAppId());
}
});
}
@@ -122,15 +127,15 @@ public class PipelinePreview {
List<NamedStreamPipesEntity>
graphs) {
PipelinePreviewModel previewModel = new PipelinePreviewModel();
previewModel.setPreviewId(previewId);
- previewModel.setSupportedPipelineElementDomIds(collectDomIds(graphs));
+ previewModel.setSupportedPipelineElementDomIds(collectElementIds(graphs));
return previewModel;
}
- private List<String> collectDomIds(List<NamedStreamPipesEntity> graphs) {
+ private List<String> collectElementIds(List<NamedStreamPipesEntity> graphs) {
return graphs
.stream()
- .map(NamedStreamPipesEntity::getDom)
+ .map(NamedStreamPipesEntity::getElementId)
.collect(Collectors.toList());
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java
new file mode 100644
index 0000000000..7221339d13
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.runtime;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.SpProtocolManager;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DataStreamRuntimeInfoProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataStreamRuntimeInfoProvider.class);
+
+ private final Environment env;
+ private final Map<String, SpDataStream> dataStreams;
+ private final List<EventConsumer> consumers;
+ private final Map<String, Map<String, Object>> latestEvents;
+
+ public DataStreamRuntimeInfoProvider(Map<String, SpDataStream> dataStreams) {
+ this.dataStreams = dataStreams;
+ this.consumers = new ArrayList<>();
+ this.env = Environments.getEnvironment();
+ this.latestEvents = new HashMap<>();
+ }
+
+ public void startConsuming() throws SpRuntimeException {
+ dataStreams.forEach((id, dataStream) -> {
+ var protocol = dataStream.getEventGrounding().getTransportProtocol();
+ if (env.getSpDebug().getValueOrDefault()) {
+ protocol.setBrokerHostname("localhost");
+ if (protocol instanceof KafkaTransportProtocol) {
+ ((KafkaTransportProtocol) protocol).setKafkaPort(9094);
+ }
+ }
+
+ var converter = new
SpDataFormatConverterGenerator(getTransportFormat(dataStream)).makeConverter();
+ var protocolDefinitionOpt = SpProtocolManager
+ .INSTANCE
+
.findDefinition(dataStream.getEventGrounding().getTransportProtocol());
+
+ if (protocolDefinitionOpt.isPresent()) {
+ var consumer = protocolDefinitionOpt.get().getConsumer(protocol);
+ consumer.connect(event -> {
+ var deserializedEvent = converter.convert(event);
+ this.latestEvents.put(id, deserializedEvent);
+ });
+ consumers.add(consumer);
+ } else {
+ LOG.error("Error while fetching data for preview - protocol {} not
found - did you register the protocol? ",
+ protocol.getClass().getCanonicalName());
+ throw new SpRuntimeException("Protocol not found");
+ }
+ });
+ }
+
+ private TransportFormat getTransportFormat(SpDataStream spDataStream) {
+ return spDataStream.getEventGrounding().getTransportFormats().get(0);
+ }
+
+ public Map<String, Map<String, Object>> getLatestEvents() {
+ return latestEvents;
+ }
+
+ public void close() {
+ consumers.forEach(EventConsumer::disconnect);
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
deleted file mode 100644
index 8c522477ac..0000000000
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.runtime;
-
-import org.apache.streampipes.commons.environment.Environment;
-import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.messaging.EventConsumer;
-import org.apache.streampipes.messaging.SpProtocolManager;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportFormat;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public enum PipelineElementRuntimeInfoFetcher {
- INSTANCE;
-
- private static final Logger LOG =
LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
-
- private static final int FETCH_INTERVAL_MS = 300;
- private final Map<String, SpDataFormatConverter> converterMap;
- private final Environment env;
-
- PipelineElementRuntimeInfoFetcher() {
- this.converterMap = new HashMap<>();
- this.env = Environments.getEnvironment();
- }
-
- public String getCurrentData(SpDataStream spDataStream) throws
SpRuntimeException {
- var topic = getOutputTopic(spDataStream);
- var protocol = spDataStream.getEventGrounding().getTransportProtocol();
- if (env.getSpDebug().getValueOrDefault()) {
- protocol.setBrokerHostname("localhost");
- if (protocol instanceof KafkaTransportProtocol) {
- ((KafkaTransportProtocol) protocol).setKafkaPort(9094);
- }
- }
-
- if (!converterMap.containsKey(topic)) {
- this.converterMap.put(topic,
- new
SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
- }
-
- var converter = converterMap.get(topic);
-
- var protocolDefinitionOpt = SpProtocolManager
- .INSTANCE
-
.findDefinition(spDataStream.getEventGrounding().getTransportProtocol());
-
- if (protocolDefinitionOpt.isPresent()) {
- var consumer = protocolDefinitionOpt.get().getConsumer(protocol);
- return getLatestEvent(consumer, converter);
-
- } else {
- LOG.error("Error while fetching data for preview - protocol {} not found
- did you register the protocol? ",
- protocol.getClass().getCanonicalName());
- throw new SpRuntimeException("Protocol not found");
- }
- }
-
- private TransportFormat getTransportFormat(SpDataStream spDataStream) {
- return spDataStream.getEventGrounding().getTransportFormats().get(0);
- }
-
- private String getOutputTopic(SpDataStream spDataStream) {
- return spDataStream
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
-
- private void waitForEvent(String[] result) {
- long timeout = 0;
- while (result[0] == null && timeout < 6000) {
- try {
- TimeUnit.MILLISECONDS.sleep(FETCH_INTERVAL_MS);
- timeout = timeout + 300;
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- private String getLatestEvent(EventConsumer consumer,
- SpDataFormatConverter converter) {
- final String[] result = {null};
- consumer.connect(event -> {
- result[0] = converter.convert(event);
- consumer.disconnect();
- });
-
- waitForEvent(result);
-
- return result[0];
- }
-}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/RateLimitedRuntimeInfoProvider.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/RateLimitedRuntimeInfoProvider.java
new file mode 100644
index 0000000000..ee763a40f6
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/RateLimitedRuntimeInfoProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.runtime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+public class RateLimitedRuntimeInfoProvider {
+
+ private static final int MAX_PREVIEW_TIME_CYCLES = 360;
+ private static final int MAX_FREQUENCY = 500;
+
+ private final DataStreamRuntimeInfoProvider runtimeInfoProvider;
+ private final ObjectMapper objectMapper;
+
+ public RateLimitedRuntimeInfoProvider(DataStreamRuntimeInfoProvider
runtimeInfoProvider) {
+ this.runtimeInfoProvider = runtimeInfoProvider;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ public void streamOutput(OutputStream outputStream) {
+ runtimeInfoProvider.startConsuming();
+ try {
+ for (int i = 0; i < MAX_PREVIEW_TIME_CYCLES; i++) {
+ var messages = runtimeInfoProvider.getLatestEvents();
+ if (!messages.isEmpty()) {
+ try {
+ outputStream.write((objectMapper.writeValueAsString(messages) +
"\n").getBytes());
+ outputStream.flush();
+ } catch (IOException ignored) {
+ }
+ }
+ TimeUnit.MILLISECONDS.sleep(MAX_FREQUENCY);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ closeResources(runtimeInfoProvider);
+ }
+ }
+
+ private void closeResources(DataStreamRuntimeInfoProvider fetcher) {
+ fetcher.close();
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverter.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverter.java
index aab21ea5c8..1e02d261d2 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverter.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverter.java
@@ -19,27 +19,18 @@ package org.apache.streampipes.manager.runtime;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import java.util.Map;
public class SpDataFormatConverter {
- private SpDataFormatDefinition spDataFormatDefinition;
- private JsonDataFormatDefinition jsonDataFormatDefinition;
+ private final SpDataFormatDefinition spDataFormatDefinition;
public SpDataFormatConverter(SpDataFormatDefinition spDataFormatDefinition) {
this.spDataFormatDefinition = spDataFormatDefinition;
- this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
}
- public String convert(byte[] message) throws SpRuntimeException {
- Map<String, Object> event = spDataFormatDefinition.toMap(message);
- return toJson(event);
+ public Map<String, Object> convert(byte[] message) throws SpRuntimeException
{
+ return spDataFormatDefinition.toMap(message);
}
-
- private String toJson(Map<String, Object> message) throws SpRuntimeException
{
- return new String(jsonDataFormatDefinition.fromMap(message));
- }
-
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementPreview.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementPreview.java
index 47105afc24..59fb2aa053 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementPreview.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementPreview.java
@@ -18,9 +18,12 @@
package org.apache.streampipes.rest.impl;
import org.apache.streampipes.manager.preview.PipelinePreview;
+import org.apache.streampipes.manager.runtime.DataStreamRuntimeInfoProvider;
+import org.apache.streampipes.manager.runtime.RateLimitedRuntimeInfoProvider;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.preview.PipelinePreviewModel;
import
org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.rest.shared.exception.BadRequestException;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
@@ -31,6 +34,9 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
+
+import jakarta.servlet.http.HttpServletResponse;
@RestController
@RequestMapping("/api/v2/pipeline-element-preview")
@@ -47,15 +53,20 @@ public class PipelineElementPreview extends
AbstractAuthGuardedRestResource {
return ok(previewModel);
}
- @GetMapping(path = "{previewId}/{pipelineElementDomId}",
- produces = MediaType.APPLICATION_JSON_VALUE)
- public ResponseEntity<?> getPipelinePreviewResult(@PathVariable("previewId")
String previewId,
-
@PathVariable("pipelineElementDomId") String pipelineElementDomId) {
+ @GetMapping(path = "{previewId}",
+ produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
+ public StreamingResponseBody getPipelinePreviewResult(
+ HttpServletResponse response,
+ @PathVariable("previewId") String previewId) {
try {
- String runtimeInfo = new
PipelinePreview().getPipelineElementPreview(previewId, pipelineElementDomId);
- return ok(runtimeInfo);
+ // deactivate nginx proxy buffering for better performance of streaming
output
+ response.addHeader("X-Accel-Buffering", "no");
+ var spDataStreams = new
PipelinePreview().getPipelineElementPreviewStreams(previewId);
+ var runtimeInfoFetcher = new
DataStreamRuntimeInfoProvider(spDataStreams);
+ var runtimeInfoProvider = new
RateLimitedRuntimeInfoProvider(runtimeInfoFetcher);
+ return runtimeInfoProvider::streamOutput;
} catch (IllegalArgumentException e) {
- return badRequest();
+ throw new BadRequestException("Could not generate preview", e);
}
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementRuntimeInfo.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementRuntimeInfo.java
index 806bbbfce3..17a11bfbfc 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementRuntimeInfo.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementRuntimeInfo.java
@@ -17,32 +17,36 @@
*/
package org.apache.streampipes.rest.impl;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.runtime.DataStreamRuntimeInfoProvider;
+import org.apache.streampipes.manager.runtime.RateLimitedRuntimeInfoProvider;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
+
+import jakarta.servlet.http.HttpServletResponse;
+
+import java.util.Map;
@RestController
@RequestMapping("/api/v2/pipeline-element/runtime")
public class PipelineElementRuntimeInfo extends AbstractRestResource {
@PostMapping(
- produces = MediaType.APPLICATION_JSON_VALUE,
+ produces = MediaType.APPLICATION_OCTET_STREAM_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE
)
- public ResponseEntity<?> getRuntimeInfo(@RequestBody SpDataStream
spDataStream) {
- try {
- return ok(Operations.getRuntimeInfo(spDataStream));
- } catch (SpRuntimeException e) {
- return statusMessage(Notifications.error("Could not get runtime data"));
- }
+ public StreamingResponseBody getRuntimeInfo(HttpServletResponse response,
+ @RequestBody SpDataStream
spDataStream) {
+ // deactivate nginx proxy buffering for better performance of streaming
output
+ response.addHeader("X-Accel-Buffering", "no");
+ var runtimeInfoFetcher = new
DataStreamRuntimeInfoProvider(Map.of("adapter", spDataStream));
+ var runtimeInfoProvider = new
RateLimitedRuntimeInfoProvider(runtimeInfoFetcher);
+ return runtimeInfoProvider::streamOutput;
}
}
diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts
b/ui/cypress/support/utils/connect/ConnectUtils.ts
index 034cb0c0ad..ca19602dfb 100644
--- a/ui/cypress/support/utils/connect/ConnectUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectUtils.ts
@@ -164,7 +164,12 @@ export class ConnectUtils {
cy.dataCy('sp-adapter-name').type(adapterInput.adapterName);
if (adapterInput.storeInDataLake) {
- cy.dataCy('sp-store-in-datalake').children().click();
+ cy.dataCy('sp-store-in-datalake', {
+ timeout: 5000,
+ })
+ .should('be.visible')
+ .children()
+ .click();
cy.dataCy('sp-store-in-datalake-timestamp')
.click()
.get('mat-option')
@@ -348,7 +353,7 @@ export class ConnectUtils {
// Validate resulting event
cy.dataCy('sp-connect-adapter-success-live-preview', {
- timeout: 10000,
+ timeout: 20000,
}).should('be.visible');
// validate that X event properties. The +1 is for the header row
diff --git
a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html
b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html
index 26ed16172e..57f39299bf 100644
---
a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html
+++
b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html
@@ -18,7 +18,7 @@
<div fxFlex="100" fxLayout="column">
<sp-pipeline-element-runtime-info
+ *ngIf="streamDescription"
[streamDescription]="streamDescription"
- [pollingActive]="pollingActive"
></sp-pipeline-element-runtime-info>
</div>
diff --git
a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts
b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts
index 2b8fd3b27a..5ea85c950a 100644
---
a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts
+++
b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts
@@ -24,15 +24,13 @@ import { RestService } from
'../../../services/rest.service';
selector: 'sp-adapter-started-preview',
templateUrl: './adapter-started-preview.component.html',
})
-export class SpAdapterStartedPreviewComponent implements OnInit, OnDestroy {
+export class SpAdapterStartedPreviewComponent implements OnInit {
@Input()
streamDescription: SpDataStream;
@Input()
adapterElementId: string;
- pollingActive = false;
-
constructor(
private adapterService: AdapterService,
private restService: RestService,
@@ -50,12 +48,7 @@ export class SpAdapterStartedPreviewComponent implements
OnInit, OnDestroy {
.getSourceDetails(adapter.correspondingDataStreamElementId)
.subscribe(st => {
this.streamDescription = st;
- this.pollingActive = true;
});
});
}
-
- ngOnDestroy(): void {
- this.pollingActive = false;
- }
}
diff --git a/ui/src/app/connect/services/rest.service.ts
b/ui/src/app/connect/services/rest.service.ts
index cede76c54c..8b7c46f426 100644
--- a/ui/src/app/connect/services/rest.service.ts
+++ b/ui/src/app/connect/services/rest.service.ts
@@ -18,7 +18,7 @@
import { Injectable } from '@angular/core';
-import { HttpClient, HttpContext } from '@angular/common/http';
+import { HttpClient, HttpContext, HttpEvent } from '@angular/common/http';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@@ -78,11 +78,16 @@ export class RestService {
);
}
- getRuntimeInfo(sourceDescription): Observable<Record<string, any>> {
+ getRuntimeInfo(
+ sourceDescription: SpDataStream,
+ ): Observable<HttpEvent<string>> {
return this.http.post(
`${this.platformServicesCommons.apiBasePath}/pipeline-element/runtime`,
sourceDescription,
{
+ responseType: 'text',
+ observe: 'events',
+ reportProgress: true,
context: new HttpContext().set(NGX_LOADING_BAR_IGNORED, true),
},
);
diff --git
a/ui/src/app/core-ui/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts
b/ui/src/app/core-ui/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts
index 3fd5977784..2094530ff8 100644
---
a/ui/src/app/core-ui/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts
+++
b/ui/src/app/core-ui/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts
@@ -19,6 +19,9 @@
import { Component, Input, OnDestroy, OnInit } from '@angular/core';
import { SpDataStream } from '@streampipes/platform-services';
import { RestService } from '../../connect/services/rest.service';
+import { Subscription } from 'rxjs';
+import { HttpDownloadProgressEvent, HttpEventType } from
'@angular/common/http';
+import { LivePreviewService } from '../../services/live-preview.service';
@Component({
selector: 'sp-pipeline-element-runtime-info',
@@ -29,55 +32,44 @@ export class PipelineElementRuntimeInfoComponent implements
OnInit, OnDestroy {
@Input()
streamDescription: SpDataStream;
- _pollingActive: boolean;
-
runtimeData: { runtimeName: string; value: any }[];
timer: any;
runtimeDataError = false;
+ runtimeSub: Subscription;
- constructor(private restService: RestService) {}
+ constructor(
+ private restService: RestService,
+ private livePreviewService: LivePreviewService,
+ ) {}
ngOnInit(): void {
- this.checkPollingStart();
- }
-
- checkPollingStart() {
- if (this._pollingActive) {
- this.getLatestRuntimeInfo();
- }
+ this.getLatestRuntimeInfo();
}
getLatestRuntimeInfo() {
- this.restService
+ this.runtimeSub = this.restService
.getRuntimeInfo(this.streamDescription)
- .subscribe(data => {
- this.runtimeDataError = !data;
-
- this.runtimeData = Object.entries(data).map(
- ([runtimeName, value]) => ({ runtimeName, value }),
- );
-
- if (this._pollingActive) {
- this.timer = setTimeout(
- () => this.getLatestRuntimeInfo(),
- 1000,
- );
+ .subscribe(event => {
+ if (event.type === HttpEventType.DownloadProgress) {
+ try {
+ const responseJson = this.livePreviewService.convert(
+ event as HttpDownloadProgressEvent,
+ );
+ const [firstKey] = Object.keys(responseJson);
+ const json = responseJson[firstKey];
+ this.runtimeDataError = !json;
+ this.runtimeData = Object.entries(json).map(
+ ([runtimeName, value]) => ({ runtimeName, value }),
+ );
+ } catch (error) {
+ this.runtimeDataError = true;
+ this.runtimeData = [];
+ }
}
});
}
- @Input()
- set pollingActive(pollingActive: boolean) {
- this._pollingActive = pollingActive;
- this.checkPollingStart();
- }
-
- get pollingActive(): boolean {
- return this._pollingActive;
- }
-
ngOnDestroy(): void {
- this.pollingActive = false;
- clearTimeout(this.timer);
+ this.runtimeSub?.unsubscribe();
}
}
diff --git
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.html
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.html
index 306b9f5829..a58235feaa 100644
---
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.html
+++
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.html
@@ -28,7 +28,10 @@
</div>
<table class="row-border hover preview-table" *ngIf="runtimeData">
<tbody id="preview-data-rows-id">
- <tr *ngFor="let item of runtimeData | keyvalue"
class="preview-row">
+ <tr
+ *ngFor="let item of runtimeData | keyvalue: keyValueCompareFn"
+ class="preview-row"
+ >
<td>
<b>{{ item.key }}</b>
</td>
diff --git
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
index 3734f07db9..ae8a6fc729 100644
---
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
+++
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
@@ -18,7 +18,7 @@
.data-preview {
position: relative;
- left: 0px;
+ left: 0;
top: 120px;
width: 250px;
height: 120px;
@@ -29,12 +29,12 @@
z-index: 50;
}
-.preview-table {
- font-size: 9pt;
+.data-preview:hover {
+ z-index: 1000;
}
-.mt-10 {
- margin-top: 10px;
+.preview-table {
+ font-size: 9pt;
}
.preview-row,
diff --git
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
index 8e57fc405e..1e0d949ff8 100644
---
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
+++
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
@@ -16,53 +16,51 @@
*
*/
-import { Component, Input, OnInit } from '@angular/core';
-import { EditorService } from '../../services/editor.service';
+import { Component, Input, OnDestroy, OnInit } from '@angular/core';
+import { Subscription } from 'rxjs';
+import { KeyValue } from '@angular/common';
+import { LivePreviewService } from '../../../services/live-preview.service';
@Component({
selector: 'sp-pipeline-element-preview',
templateUrl: './pipeline-element-preview.component.html',
styleUrls: ['./pipeline-element-preview.component.scss'],
})
-export class PipelineElementPreviewComponent implements OnInit {
+export class PipelineElementPreviewComponent implements OnInit, OnDestroy {
@Input()
previewId: string;
@Input()
- pipelineElementDomId: string;
-
- runtimeData: ReadonlyMap<string, unknown>;
+ elementId: string;
+ runtimeData: Record<string, any>;
runtimeDataError = false;
- timer: any;
+ previewSub: Subscription;
- constructor(private editorService: EditorService) {}
+ constructor(private livePreviewService: LivePreviewService) {}
ngOnInit(): void {
this.getLatestRuntimeInfo();
}
+ keyValueCompareFn = (
+ a: KeyValue<string, any>,
+ b: KeyValue<string, any>,
+ ): number => {
+ return a.key.localeCompare(b.key);
+ };
+
getLatestRuntimeInfo() {
- this.editorService
- .getPipelinePreviewResult(this.previewId,
this.pipelineElementDomId)
- .subscribe(data => {
- if (data) {
- this.runtimeDataError = false;
- if (
- !(
- Object.keys(data).length === 0 &&
- data.constructor === Object
- )
- ) {
- this.runtimeData = data;
- }
+ this.previewSub = this.livePreviewService.eventSub.subscribe(event => {
+ if (event) {
+ this.runtimeData = event[this.elementId];
+ } else {
+ this.runtimeDataError = true;
+ }
+ });
+ }
- this.timer = setTimeout(() => {
- this.getLatestRuntimeInfo();
- }, 1000);
- } else {
- this.runtimeDataError = true;
- }
- });
+ ngOnDestroy() {
+ this.previewSub?.unsubscribe();
}
}
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.html
b/ui/src/app/editor/components/pipeline/pipeline.component.html
index 7cd3a3206b..d1015b310f 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.html
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.html
@@ -96,11 +96,11 @@
*ngIf="
previewModeActive &&
pipelinePreview.supportedPipelineElementDomIds.indexOf(
- pipelineElement.payload.dom
+ pipelineElement.payload.elementId
) > -1
"
[previewId]="pipelinePreview.previewId"
- [pipelineElementDomId]="pipelineElement.payload.dom"
+ [elementId]="pipelineElement.payload.elementId"
>
</sp-pipeline-element-preview>
</div>
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts
b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index 40044192e9..6838fff3cd 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -24,6 +24,7 @@ import { ShepherdService } from
'../../../services/tour/shepherd.service';
import {
Component,
EventEmitter,
+ HostListener,
Input,
NgZone,
OnDestroy,
@@ -58,7 +59,7 @@ import {
import { EditorService } from '../../services/editor.service';
import { MatchingErrorComponent } from
'../../dialog/matching-error/matching-error.component';
import { MatDialog } from '@angular/material/dialog';
-import { forkJoin } from 'rxjs';
+import { forkJoin, Subscription } from 'rxjs';
import { JsplumbFactoryService } from '../../services/jsplumb-factory.service';
import { PipelinePositioningService } from
'../../services/pipeline-positioning.service';
import {
@@ -70,6 +71,8 @@ import {
} from '@jsplumb/browser-ui';
import { PipelineStyleService } from '../../services/pipeline-style.service';
import { IdGeneratorService } from
'../../../core-services/id-generator/id-generator.service';
+import { LivePreviewService } from '../../../services/live-preview.service';
+import { HttpDownloadProgressEvent } from '@angular/common/http';
@Component({
selector: 'sp-pipeline',
@@ -117,6 +120,7 @@ export class PipelineComponent implements OnInit, OnDestroy
{
previewModeActive = false;
pipelinePreview: PipelinePreviewModel;
+ pipelinePreviewSub: Subscription;
shouldOpenCustomizeSettings = false;
@@ -134,6 +138,7 @@ export class PipelineComponent implements OnInit, OnDestroy
{
private dialogService: DialogService,
private dialog: MatDialog,
private ngZone: NgZone,
+ private livePreviewService: LivePreviewService,
) {
this.currentMouseOverElement = '';
this.currentPipelineModel = new Pipeline();
@@ -170,6 +175,11 @@ export class PipelineComponent implements OnInit,
OnDestroy {
this.jsplumbFactoryService.destroy(this.preview);
}
+ @HostListener('window:beforeunload')
+ onWindowClose() {
+ this.ngOnDestroy();
+ }
+
updateMouseover(elementId: string) {
this.currentMouseOverElement = elementId;
}
@@ -656,6 +666,14 @@ export class PipelineComponent implements OnInit,
OnDestroy {
.subscribe(response => {
this.pipelinePreview = response;
this.previewModeActive = true;
+ this.pipelinePreviewSub = this.editorService
+ .getPipelinePreviewResult(response.previewId)
+ .subscribe(res => {
+ const data = this.livePreviewService.convert(
+ res as HttpDownloadProgressEvent,
+ );
+ this.livePreviewService.eventSub.next(data);
+ });
});
} else {
this.deletePipelineElementPreview(false);
@@ -664,6 +682,7 @@ export class PipelineComponent implements OnInit, OnDestroy
{
deletePipelineElementPreview(resume: boolean) {
if (this.previewModeActive) {
+ this.pipelinePreviewSub?.unsubscribe();
this.editorService
.deletePipelinePreviewRequest(this.pipelinePreview.previewId)
.subscribe(() => {
diff --git a/ui/src/app/editor/services/editor.service.ts
b/ui/src/app/editor/services/editor.service.ts
index a07f6bbee9..45ba82218d 100644
--- a/ui/src/app/editor/services/editor.service.ts
+++ b/ui/src/app/editor/services/editor.service.ts
@@ -17,7 +17,7 @@
*/
import { Injectable } from '@angular/core';
-import { HttpClient, HttpContext } from '@angular/common/http';
+import { HttpClient, HttpContext, HttpEvent } from '@angular/common/http';
import {
DataProcessorInvocation,
DataSinkInvocation,
@@ -189,18 +189,13 @@ export class EditorService {
return this.http.delete(this.pipelinePreviewBasePath + '/' +
previewId);
}
- getPipelinePreviewResult(
- previewId: string,
- pipelineElementDomId: string,
- ): Observable<any> {
- return this.http.get(
- this.pipelinePreviewBasePath +
- '/' +
- previewId +
- '/' +
- pipelineElementDomId,
- { context: new HttpContext().set(NGX_LOADING_BAR_IGNORED, true) },
- );
+ getPipelinePreviewResult(previewId: string): Observable<HttpEvent<string>>
{
+ return this.http.get(`${this.pipelinePreviewBasePath}/${previewId}`, {
+ responseType: 'text',
+ observe: 'events',
+ reportProgress: true,
+ context: new HttpContext().set(NGX_LOADING_BAR_IGNORED, true),
+ });
}
get pipelinePreviewBasePath() {
diff --git
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
b/ui/src/app/services/live-preview.service.ts
similarity index 61%
copy from
ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
copy to ui/src/app/services/live-preview.service.ts
index 3734f07db9..8f6ccb39fd 100644
---
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
+++ b/ui/src/app/services/live-preview.service.ts
@@ -16,28 +16,19 @@
*
*/
-.data-preview {
- position: relative;
- left: 0px;
- top: 120px;
- width: 250px;
- height: 120px;
- background: var(--color-bg-dialog);
- overflow: auto;
- border: 1px solid gray;
- box-shadow: 0.175em 0.175em 0 0 rgba(15, 28, 63, 0.125);
- z-index: 50;
-}
-
-.preview-table {
- font-size: 9pt;
-}
+import { Injectable } from '@angular/core';
+import { HttpDownloadProgressEvent } from '@angular/common/http';
+import { Subject } from 'rxjs';
-.mt-10 {
- margin-top: 10px;
-}
+@Injectable({ providedIn: 'root' })
+export class LivePreviewService {
+ public eventSub: Subject<Record<string, any>> = new Subject<
+ Record<string, any>
+ >();
-.preview-row,
-.preview-table {
- background: var(--color-bg-dialog);
+ convert(event: HttpDownloadProgressEvent) {
+ const { partialText } = event;
+ const chunks = partialText.split('\n');
+ return JSON.parse(chunks[chunks.length - 2]);
+ }
}