This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new f6265ea420 chore: Provide adapter and pipeline preview as stream 
(#3101)
f6265ea420 is described below

commit f6265ea420af8f5e4e9ffa5c370416058a9abc4f
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Aug 8 07:57:00 2024 +0200

    chore: Provide adapter and pipeline preview as stream (#3101)
    
    * chore: Provide adapter and pipeline preview as stream
    
    * Cleanup
    
    * Fix checkstyle
    
    * Fix adapter preview
    
    * Add timeout to test
    
    * Fix timeout
    
    * Increase interval
    
    * Load pipeline preview in a single request
    
    * Cleanup
    
    * Add response header to deactive proxy buffering
---
 .../messaging/kafka/SpKafkaConsumer.java           |   1 -
 .../streampipes/manager/operations/Operations.java |   6 --
 .../manager/preview/ActivePipelinePreviews.java    |  17 +--
 .../manager/preview/PipelinePreview.java           |  59 ++++++-----
 .../runtime/DataStreamRuntimeInfoProvider.java     |  94 ++++++++++++++++
 .../runtime/PipelineElementRuntimeInfoFetcher.java | 118 ---------------------
 .../runtime/RateLimitedRuntimeInfoProvider.java    |  64 +++++++++++
 .../manager/runtime/SpDataFormatConverter.java     |  15 +--
 .../rest/impl/PipelineElementPreview.java          |  25 +++--
 .../rest/impl/PipelineElementRuntimeInfo.java      |  26 +++--
 ui/cypress/support/utils/connect/ConnectUtils.ts   |   9 +-
 .../adapter-started-preview.component.html         |   2 +-
 .../adapter-started-preview.component.ts           |   9 +-
 ui/src/app/connect/services/rest.service.ts        |   9 +-
 .../pipeline-element-runtime-info.component.ts     |  62 +++++------
 .../pipeline-element-preview.component.html        |   5 +-
 .../pipeline-element-preview.component.scss        |  10 +-
 .../pipeline-element-preview.component.ts          |  54 +++++-----
 .../components/pipeline/pipeline.component.html    |   4 +-
 .../components/pipeline/pipeline.component.ts      |  21 +++-
 ui/src/app/editor/services/editor.service.ts       |  21 ++--
 .../live-preview.service.ts}                       |  35 +++---
 22 files changed, 348 insertions(+), 318 deletions(-)

diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index e999cfbc4a..fcbd38224f 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -104,7 +104,6 @@ public class SpKafkaConsumer implements EventConsumer, 
Runnable,
       ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
       records.forEach(record -> eventProcessor.onEvent(record.value()));
     }
-    LOG.info("Closing Kafka Consumer.");
     consumer.close();
   }
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
index 81d6215c88..f2addd2b20 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
@@ -20,12 +20,10 @@ package org.apache.streampipes.manager.operations;
 
 import 
org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
 import org.apache.streampipes.manager.recommender.ElementRecommender;
 import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
-import 
org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher;
 import org.apache.streampipes.manager.storage.PipelineStorageService;
 import org.apache.streampipes.manager.template.PipelineTemplateGenerator;
 import 
org.apache.streampipes.manager.template.PipelineTemplateInvocationGenerator;
@@ -116,10 +114,6 @@ public class Operations {
     return new ContainerProvidedOptionsHandler().fetchRemoteOptions(request);
   }
 
-  public static String getRuntimeInfo(SpDataStream spDataStream) throws 
SpRuntimeException {
-    return 
PipelineElementRuntimeInfoFetcher.INSTANCE.getCurrentData(spDataStream);
-  }
-
   public static List<PipelineTemplateDescription> getAllPipelineTemplates() {
     return new PipelineTemplateGenerator().getAllPipelineTemplates();
   }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/ActivePipelinePreviews.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/ActivePipelinePreviews.java
index 9f7922b357..d32af21c6e 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/ActivePipelinePreviews.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/ActivePipelinePreviews.java
@@ -22,13 +22,12 @@ import 
org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 public enum ActivePipelinePreviews {
 
   INSTANCE;
 
-  private Map<String, List<NamedStreamPipesEntity>> activePreviews;
+  private final Map<String, List<NamedStreamPipesEntity>> activePreviews;
 
   ActivePipelinePreviews() {
     this.activePreviews = new HashMap<>();
@@ -46,18 +45,4 @@ public enum ActivePipelinePreviews {
   public List<NamedStreamPipesEntity> getInvocationGraphs(String previewId) {
     return this.activePreviews.get(previewId);
   }
-
-  public Optional<NamedStreamPipesEntity> 
getInvocationGraphForPipelineELement(String previewId,
-                                                                               
String pipelineElementDomId) {
-    List<NamedStreamPipesEntity> graphs = this.activePreviews.get(previewId);
-
-    if (graphs == null || graphs.size() == 0) {
-      return Optional.empty();
-    } else {
-      return graphs
-          .stream()
-          .filter(g -> g.getDom().equals(pipelineElementDomId))
-          .findFirst();
-    }
-  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 903ecbeb75..6795d6f67c 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -23,7 +23,6 @@ import 
org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpoi
 import org.apache.streampipes.manager.execution.http.DetachHttpRequest;
 import org.apache.streampipes.manager.execution.http.InvokeHttpRequest;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
-import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
@@ -31,25 +30,32 @@ import 
org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.preview.PipelinePreviewModel;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
+import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
 public class PipelinePreview {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(PipelinePreview.class);
+
   public PipelinePreviewModel initiatePreview(Pipeline pipeline) {
     String previewId = generatePreviewId();
     pipeline.setActions(new ArrayList<>());
-    List<NamedStreamPipesEntity> pipelineElements = new 
PipelineVerificationHandlerV2(pipeline)
-        .verifyAndBuildGraphs(true)
-        .stream()
-        .collect(Collectors.toList());
+    List<NamedStreamPipesEntity> pipelineElements = new ArrayList<>(
+        new PipelineVerificationHandlerV2(pipeline)
+            .verifyAndBuildGraphs(true)
+    );
 
     invokeGraphs(filter(pipelineElements));
     storeGraphs(previewId, pipelineElements);
 
+    LOG.info("Preview pipeline {} started", previewId);
+
     return makePreviewModel(previewId, pipelineElements);
   }
 
@@ -57,26 +63,25 @@ public class PipelinePreview {
     List<NamedStreamPipesEntity> graphs = 
ActivePipelinePreviews.INSTANCE.getInvocationGraphs(previewId);
     detachGraphs(filter(graphs));
     deleteGraphs(previewId);
+    LOG.info("Preview pipeline {} stopped", previewId);
   }
 
-  public String getPipelineElementPreview(String previewId,
-                                          String pipelineElementDomId) throws 
IllegalArgumentException {
-    Optional<NamedStreamPipesEntity> graphOpt = ActivePipelinePreviews
+  public Map<String, SpDataStream> getPipelineElementPreviewStreams(String 
previewId) throws IllegalArgumentException {
+    return ActivePipelinePreviews
         .INSTANCE
-        .getInvocationGraphForPipelineELement(previewId, pipelineElementDomId);
-
-    if (graphOpt.isPresent()) {
-      NamedStreamPipesEntity graph = graphOpt.get();
-      if (graph instanceof DataProcessorInvocation) {
-        return Operations.getRuntimeInfo(((DataProcessorInvocation) 
graph).getOutputStream());
-      } else if (graph instanceof SpDataStream) {
-        return Operations.getRuntimeInfo((SpDataStream) graph);
-      } else {
-        throw new IllegalArgumentException("Requested pipeline element is not 
a data processor");
-      }
-    } else {
-      throw new IllegalArgumentException("Could not find pipeline element");
-    }
+        .getInvocationGraphs(previewId)
+        .stream()
+        .filter(graph -> graph instanceof DataProcessorInvocation || graph 
instanceof SpDataStream)
+        .collect(Collectors.toMap(
+            NamedStreamPipesEntity::getElementId,
+            graph -> {
+              if (graph instanceof DataProcessorInvocation) {
+                return ((DataProcessorInvocation) graph).getOutputStream();
+              } else {
+                return (SpDataStream) graph;
+              }
+            }
+        ));
   }
 
   private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws 
NoServiceEndpointsAvailableException {
@@ -93,7 +98,7 @@ public class PipelinePreview {
         g.setSelectedEndpointUrl(findSelectedEndpoint(g));
         new InvokeHttpRequest().execute(g, g.getSelectedEndpointUrl(), null);
       } catch (NoServiceEndpointsAvailableException e) {
-        e.printStackTrace();
+        LOG.warn("No endpoint found for pipeline element {}", g.getAppId());
       }
     });
   }
@@ -122,15 +127,15 @@ public class PipelinePreview {
                                                 List<NamedStreamPipesEntity> 
graphs) {
     PipelinePreviewModel previewModel = new PipelinePreviewModel();
     previewModel.setPreviewId(previewId);
-    previewModel.setSupportedPipelineElementDomIds(collectDomIds(graphs));
+    previewModel.setSupportedPipelineElementDomIds(collectElementIds(graphs));
 
     return previewModel;
   }
 
-  private List<String> collectDomIds(List<NamedStreamPipesEntity> graphs) {
+  private List<String> collectElementIds(List<NamedStreamPipesEntity> graphs) {
     return graphs
         .stream()
-        .map(NamedStreamPipesEntity::getDom)
+        .map(NamedStreamPipesEntity::getElementId)
         .collect(Collectors.toList());
   }
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java
new file mode 100644
index 0000000000..7221339d13
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/DataStreamRuntimeInfoProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.runtime;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.EventConsumer;
+import org.apache.streampipes.messaging.SpProtocolManager;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DataStreamRuntimeInfoProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStreamRuntimeInfoProvider.class);
+
+  private final Environment env;
+  private final Map<String, SpDataStream> dataStreams;
+  private final List<EventConsumer> consumers;
+  private final Map<String, Map<String, Object>> latestEvents;
+
+  public DataStreamRuntimeInfoProvider(Map<String, SpDataStream> dataStreams) {
+    this.dataStreams = dataStreams;
+    this.consumers = new ArrayList<>();
+    this.env = Environments.getEnvironment();
+    this.latestEvents = new HashMap<>();
+  }
+
+  public void startConsuming() throws SpRuntimeException {
+    dataStreams.forEach((id, dataStream) -> {
+      var protocol = dataStream.getEventGrounding().getTransportProtocol();
+      if (env.getSpDebug().getValueOrDefault()) {
+        protocol.setBrokerHostname("localhost");
+        if (protocol instanceof KafkaTransportProtocol) {
+          ((KafkaTransportProtocol) protocol).setKafkaPort(9094);
+        }
+      }
+
+      var converter = new 
SpDataFormatConverterGenerator(getTransportFormat(dataStream)).makeConverter();
+      var protocolDefinitionOpt = SpProtocolManager
+          .INSTANCE
+          
.findDefinition(dataStream.getEventGrounding().getTransportProtocol());
+
+      if (protocolDefinitionOpt.isPresent()) {
+        var consumer = protocolDefinitionOpt.get().getConsumer(protocol);
+        consumer.connect(event -> {
+          var deserializedEvent = converter.convert(event);
+          this.latestEvents.put(id, deserializedEvent);
+        });
+        consumers.add(consumer);
+      } else {
+        LOG.error("Error while fetching data for preview - protocol {} not 
found - did you register the protocol? ",
+            protocol.getClass().getCanonicalName());
+        throw new SpRuntimeException("Protocol not found");
+      }
+    });
+  }
+
+  private TransportFormat getTransportFormat(SpDataStream spDataStream) {
+    return spDataStream.getEventGrounding().getTransportFormats().get(0);
+  }
+
+  public Map<String, Map<String, Object>> getLatestEvents() {
+    return latestEvents;
+  }
+
+  public void close() {
+    consumers.forEach(EventConsumer::disconnect);
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
deleted file mode 100644
index 8c522477ac..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.runtime;
-
-import org.apache.streampipes.commons.environment.Environment;
-import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.messaging.EventConsumer;
-import org.apache.streampipes.messaging.SpProtocolManager;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportFormat;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public enum PipelineElementRuntimeInfoFetcher {
-  INSTANCE;
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
-
-  private static final int FETCH_INTERVAL_MS = 300;
-  private final Map<String, SpDataFormatConverter> converterMap;
-  private final Environment env;
-
-  PipelineElementRuntimeInfoFetcher() {
-    this.converterMap = new HashMap<>();
-    this.env = Environments.getEnvironment();
-  }
-
-  public String getCurrentData(SpDataStream spDataStream) throws 
SpRuntimeException {
-    var topic = getOutputTopic(spDataStream);
-    var protocol = spDataStream.getEventGrounding().getTransportProtocol();
-    if (env.getSpDebug().getValueOrDefault()) {
-      protocol.setBrokerHostname("localhost");
-      if (protocol instanceof KafkaTransportProtocol) {
-        ((KafkaTransportProtocol) protocol).setKafkaPort(9094);
-      }
-    }
-
-    if (!converterMap.containsKey(topic)) {
-      this.converterMap.put(topic,
-          new 
SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
-    }
-
-    var converter = converterMap.get(topic);
-
-    var protocolDefinitionOpt = SpProtocolManager
-        .INSTANCE
-        
.findDefinition(spDataStream.getEventGrounding().getTransportProtocol());
-
-    if (protocolDefinitionOpt.isPresent()) {
-      var consumer = protocolDefinitionOpt.get().getConsumer(protocol);
-      return getLatestEvent(consumer, converter);
-
-    } else {
-      LOG.error("Error while fetching data for preview - protocol {} not found 
- did you register the protocol? ",
-          protocol.getClass().getCanonicalName());
-      throw new SpRuntimeException("Protocol not found");
-    }
-  }
-
-  private TransportFormat getTransportFormat(SpDataStream spDataStream) {
-    return spDataStream.getEventGrounding().getTransportFormats().get(0);
-  }
-
-  private String getOutputTopic(SpDataStream spDataStream) {
-    return spDataStream
-        .getEventGrounding()
-        .getTransportProtocol()
-        .getTopicDefinition()
-        .getActualTopicName();
-  }
-
-  private void waitForEvent(String[] result) {
-    long timeout = 0;
-    while (result[0] == null && timeout < 6000) {
-      try {
-        TimeUnit.MILLISECONDS.sleep(FETCH_INTERVAL_MS);
-        timeout = timeout + 300;
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  private String getLatestEvent(EventConsumer consumer,
-                                SpDataFormatConverter converter) {
-    final String[] result = {null};
-    consumer.connect(event -> {
-      result[0] = converter.convert(event);
-      consumer.disconnect();
-    });
-
-    waitForEvent(result);
-
-    return result[0];
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/RateLimitedRuntimeInfoProvider.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/RateLimitedRuntimeInfoProvider.java
new file mode 100644
index 0000000000..ee763a40f6
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/RateLimitedRuntimeInfoProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.runtime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+public class RateLimitedRuntimeInfoProvider {
+
+  private static final int MAX_PREVIEW_TIME_CYCLES = 360;
+  private static final int MAX_FREQUENCY = 500;
+
+  private final DataStreamRuntimeInfoProvider runtimeInfoProvider;
+  private final ObjectMapper objectMapper;
+
+  public RateLimitedRuntimeInfoProvider(DataStreamRuntimeInfoProvider 
runtimeInfoProvider) {
+    this.runtimeInfoProvider = runtimeInfoProvider;
+    this.objectMapper = new ObjectMapper();
+  }
+
+  public void streamOutput(OutputStream outputStream) {
+    runtimeInfoProvider.startConsuming();
+    try {
+      for (int i = 0; i < MAX_PREVIEW_TIME_CYCLES; i++) {
+        var messages = runtimeInfoProvider.getLatestEvents();
+        if (!messages.isEmpty()) {
+          try {
+            outputStream.write((objectMapper.writeValueAsString(messages) + 
"\n").getBytes());
+            outputStream.flush();
+          } catch (IOException ignored) {
+          }
+        }
+        TimeUnit.MILLISECONDS.sleep(MAX_FREQUENCY);
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      closeResources(runtimeInfoProvider);
+    }
+  }
+
+  private void closeResources(DataStreamRuntimeInfoProvider fetcher) {
+    fetcher.close();
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverter.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverter.java
index aab21ea5c8..1e02d261d2 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverter.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/SpDataFormatConverter.java
@@ -19,27 +19,18 @@ package org.apache.streampipes.manager.runtime;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.SpDataFormatDefinition;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 
 import java.util.Map;
 
 public class SpDataFormatConverter {
 
-  private SpDataFormatDefinition spDataFormatDefinition;
-  private JsonDataFormatDefinition jsonDataFormatDefinition;
+  private final SpDataFormatDefinition spDataFormatDefinition;
 
   public SpDataFormatConverter(SpDataFormatDefinition spDataFormatDefinition) {
     this.spDataFormatDefinition = spDataFormatDefinition;
-    this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
   }
 
-  public String convert(byte[] message) throws SpRuntimeException {
-    Map<String, Object> event = spDataFormatDefinition.toMap(message);
-    return toJson(event);
+  public Map<String, Object> convert(byte[] message) throws SpRuntimeException 
{
+    return spDataFormatDefinition.toMap(message);
   }
-
-  private String toJson(Map<String, Object> message) throws SpRuntimeException 
{
-    return new String(jsonDataFormatDefinition.fromMap(message));
-  }
-
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementPreview.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementPreview.java
index 47105afc24..59fb2aa053 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementPreview.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementPreview.java
@@ -18,9 +18,12 @@
 package org.apache.streampipes.rest.impl;
 
 import org.apache.streampipes.manager.preview.PipelinePreview;
+import org.apache.streampipes.manager.runtime.DataStreamRuntimeInfoProvider;
+import org.apache.streampipes.manager.runtime.RateLimitedRuntimeInfoProvider;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.preview.PipelinePreviewModel;
 import 
org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.rest.shared.exception.BadRequestException;
 
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
@@ -31,6 +34,9 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
+
+import jakarta.servlet.http.HttpServletResponse;
 
 @RestController
 @RequestMapping("/api/v2/pipeline-element-preview")
@@ -47,15 +53,20 @@ public class PipelineElementPreview extends 
AbstractAuthGuardedRestResource {
     return ok(previewModel);
   }
 
-  @GetMapping(path = "{previewId}/{pipelineElementDomId}",
-      produces = MediaType.APPLICATION_JSON_VALUE)
-  public ResponseEntity<?> getPipelinePreviewResult(@PathVariable("previewId") 
String previewId,
-                                           
@PathVariable("pipelineElementDomId") String pipelineElementDomId) {
+  @GetMapping(path = "{previewId}",
+      produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
+  public StreamingResponseBody getPipelinePreviewResult(
+      HttpServletResponse response,
+      @PathVariable("previewId") String previewId) {
     try {
-      String runtimeInfo = new 
PipelinePreview().getPipelineElementPreview(previewId, pipelineElementDomId);
-      return ok(runtimeInfo);
+      // deactivate nginx proxy buffering for better performance of streaming 
output
+      response.addHeader("X-Accel-Buffering", "no");
+      var spDataStreams = new 
PipelinePreview().getPipelineElementPreviewStreams(previewId);
+      var runtimeInfoFetcher = new 
DataStreamRuntimeInfoProvider(spDataStreams);
+      var runtimeInfoProvider = new 
RateLimitedRuntimeInfoProvider(runtimeInfoFetcher);
+      return runtimeInfoProvider::streamOutput;
     } catch (IllegalArgumentException e) {
-      return badRequest();
+      throw new BadRequestException("Could not generate preview", e);
     }
   }
 
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementRuntimeInfo.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementRuntimeInfo.java
index 806bbbfce3..17a11bfbfc 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementRuntimeInfo.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementRuntimeInfo.java
@@ -17,32 +17,36 @@
  */
 package org.apache.streampipes.rest.impl;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.runtime.DataStreamRuntimeInfoProvider;
+import org.apache.streampipes.manager.runtime.RateLimitedRuntimeInfoProvider;
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
 
 import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
+
+import jakarta.servlet.http.HttpServletResponse;
+
+import java.util.Map;
 
 @RestController
 @RequestMapping("/api/v2/pipeline-element/runtime")
 public class PipelineElementRuntimeInfo extends AbstractRestResource {
 
   @PostMapping(
-      produces = MediaType.APPLICATION_JSON_VALUE,
+      produces = MediaType.APPLICATION_OCTET_STREAM_VALUE,
       consumes = MediaType.APPLICATION_JSON_VALUE
   )
-  public ResponseEntity<?> getRuntimeInfo(@RequestBody SpDataStream 
spDataStream) {
-    try {
-      return ok(Operations.getRuntimeInfo(spDataStream));
-    } catch (SpRuntimeException e) {
-      return statusMessage(Notifications.error("Could not get runtime data"));
-    }
+  public StreamingResponseBody getRuntimeInfo(HttpServletResponse response,
+                                              @RequestBody SpDataStream 
spDataStream) {
+    // deactivate nginx proxy buffering for better performance of streaming 
output
+    response.addHeader("X-Accel-Buffering", "no");
+    var runtimeInfoFetcher = new 
DataStreamRuntimeInfoProvider(Map.of("adapter", spDataStream));
+    var runtimeInfoProvider = new 
RateLimitedRuntimeInfoProvider(runtimeInfoFetcher);
+    return runtimeInfoProvider::streamOutput;
   }
 }
diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts 
b/ui/cypress/support/utils/connect/ConnectUtils.ts
index 034cb0c0ad..ca19602dfb 100644
--- a/ui/cypress/support/utils/connect/ConnectUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectUtils.ts
@@ -164,7 +164,12 @@ export class ConnectUtils {
         cy.dataCy('sp-adapter-name').type(adapterInput.adapterName);
 
         if (adapterInput.storeInDataLake) {
-            cy.dataCy('sp-store-in-datalake').children().click();
+            cy.dataCy('sp-store-in-datalake', {
+                timeout: 5000,
+            })
+                .should('be.visible')
+                .children()
+                .click();
             cy.dataCy('sp-store-in-datalake-timestamp')
                 .click()
                 .get('mat-option')
@@ -348,7 +353,7 @@ export class ConnectUtils {
 
         // Validate resulting event
         cy.dataCy('sp-connect-adapter-success-live-preview', {
-            timeout: 10000,
+            timeout: 20000,
         }).should('be.visible');
 
         // validate that X event properties. The +1 is for the header row
diff --git 
a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html
 
b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html
index 26ed16172e..57f39299bf 100644
--- 
a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html
+++ 
b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.html
@@ -18,7 +18,7 @@
 
 <div fxFlex="100" fxLayout="column">
     <sp-pipeline-element-runtime-info
+        *ngIf="streamDescription"
         [streamDescription]="streamDescription"
-        [pollingActive]="pollingActive"
     ></sp-pipeline-element-runtime-info>
 </div>
diff --git 
a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts
 
b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts
index 2b8fd3b27a..5ea85c950a 100644
--- 
a/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts
+++ 
b/ui/src/app/connect/dialog/adapter-started/adapter-started-preview/adapter-started-preview.component.ts
@@ -24,15 +24,13 @@ import { RestService } from 
'../../../services/rest.service';
     selector: 'sp-adapter-started-preview',
     templateUrl: './adapter-started-preview.component.html',
 })
-export class SpAdapterStartedPreviewComponent implements OnInit, OnDestroy {
+export class SpAdapterStartedPreviewComponent implements OnInit {
     @Input()
     streamDescription: SpDataStream;
 
     @Input()
     adapterElementId: string;
 
-    pollingActive = false;
-
     constructor(
         private adapterService: AdapterService,
         private restService: RestService,
@@ -50,12 +48,7 @@ export class SpAdapterStartedPreviewComponent implements 
OnInit, OnDestroy {
                     .getSourceDetails(adapter.correspondingDataStreamElementId)
                     .subscribe(st => {
                         this.streamDescription = st;
-                        this.pollingActive = true;
                     });
             });
     }
-
-    ngOnDestroy(): void {
-        this.pollingActive = false;
-    }
 }
diff --git a/ui/src/app/connect/services/rest.service.ts 
b/ui/src/app/connect/services/rest.service.ts
index cede76c54c..8b7c46f426 100644
--- a/ui/src/app/connect/services/rest.service.ts
+++ b/ui/src/app/connect/services/rest.service.ts
@@ -18,7 +18,7 @@
 
 import { Injectable } from '@angular/core';
 
-import { HttpClient, HttpContext } from '@angular/common/http';
+import { HttpClient, HttpContext, HttpEvent } from '@angular/common/http';
 
 import { Observable } from 'rxjs';
 import { map } from 'rxjs/operators';
@@ -78,11 +78,16 @@ export class RestService {
             );
     }
 
-    getRuntimeInfo(sourceDescription): Observable<Record<string, any>> {
+    getRuntimeInfo(
+        sourceDescription: SpDataStream,
+    ): Observable<HttpEvent<string>> {
         return this.http.post(
             
`${this.platformServicesCommons.apiBasePath}/pipeline-element/runtime`,
             sourceDescription,
             {
+                responseType: 'text',
+                observe: 'events',
+                reportProgress: true,
                 context: new HttpContext().set(NGX_LOADING_BAR_IGNORED, true),
             },
         );
diff --git 
a/ui/src/app/core-ui/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts
 
b/ui/src/app/core-ui/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts
index 3fd5977784..2094530ff8 100644
--- 
a/ui/src/app/core-ui/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts
+++ 
b/ui/src/app/core-ui/pipeline-element-runtime-info/pipeline-element-runtime-info.component.ts
@@ -19,6 +19,9 @@
 import { Component, Input, OnDestroy, OnInit } from '@angular/core';
 import { SpDataStream } from '@streampipes/platform-services';
 import { RestService } from '../../connect/services/rest.service';
+import { Subscription } from 'rxjs';
+import { HttpDownloadProgressEvent, HttpEventType } from 
'@angular/common/http';
+import { LivePreviewService } from '../../services/live-preview.service';
 
 @Component({
     selector: 'sp-pipeline-element-runtime-info',
@@ -29,55 +32,44 @@ export class PipelineElementRuntimeInfoComponent implements 
OnInit, OnDestroy {
     @Input()
     streamDescription: SpDataStream;
 
-    _pollingActive: boolean;
-
     runtimeData: { runtimeName: string; value: any }[];
     timer: any;
     runtimeDataError = false;
+    runtimeSub: Subscription;
 
-    constructor(private restService: RestService) {}
+    constructor(
+        private restService: RestService,
+        private livePreviewService: LivePreviewService,
+    ) {}
 
     ngOnInit(): void {
-        this.checkPollingStart();
-    }
-
-    checkPollingStart() {
-        if (this._pollingActive) {
-            this.getLatestRuntimeInfo();
-        }
+        this.getLatestRuntimeInfo();
     }
 
     getLatestRuntimeInfo() {
-        this.restService
+        this.runtimeSub = this.restService
             .getRuntimeInfo(this.streamDescription)
-            .subscribe(data => {
-                this.runtimeDataError = !data;
-
-                this.runtimeData = Object.entries(data).map(
-                    ([runtimeName, value]) => ({ runtimeName, value }),
-                );
-
-                if (this._pollingActive) {
-                    this.timer = setTimeout(
-                        () => this.getLatestRuntimeInfo(),
-                        1000,
-                    );
+            .subscribe(event => {
+                if (event.type === HttpEventType.DownloadProgress) {
+                    try {
+                        const responseJson = this.livePreviewService.convert(
+                            event as HttpDownloadProgressEvent,
+                        );
+                        const [firstKey] = Object.keys(responseJson);
+                        const json = responseJson[firstKey];
+                        this.runtimeDataError = !json;
+                        this.runtimeData = Object.entries(json).map(
+                            ([runtimeName, value]) => ({ runtimeName, value }),
+                        );
+                    } catch (error) {
+                        this.runtimeDataError = true;
+                        this.runtimeData = [];
+                    }
                 }
             });
     }
 
-    @Input()
-    set pollingActive(pollingActive: boolean) {
-        this._pollingActive = pollingActive;
-        this.checkPollingStart();
-    }
-
-    get pollingActive(): boolean {
-        return this._pollingActive;
-    }
-
     ngOnDestroy(): void {
-        this.pollingActive = false;
-        clearTimeout(this.timer);
+        this.runtimeSub?.unsubscribe();
     }
 }
diff --git 
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.html
 
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.html
index 306b9f5829..a58235feaa 100644
--- 
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.html
+++ 
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.html
@@ -28,7 +28,10 @@
     </div>
     <table class="row-border hover preview-table" *ngIf="runtimeData">
         <tbody id="preview-data-rows-id">
-            <tr *ngFor="let item of runtimeData | keyvalue" 
class="preview-row">
+            <tr
+                *ngFor="let item of runtimeData | keyvalue: keyValueCompareFn"
+                class="preview-row"
+            >
                 <td>
                     <b>{{ item.key }}</b>
                 </td>
diff --git 
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
 
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
index 3734f07db9..ae8a6fc729 100644
--- 
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
+++ 
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
@@ -18,7 +18,7 @@
 
 .data-preview {
     position: relative;
-    left: 0px;
+    left: 0;
     top: 120px;
     width: 250px;
     height: 120px;
@@ -29,12 +29,12 @@
     z-index: 50;
 }
 
-.preview-table {
-    font-size: 9pt;
+.data-preview:hover {
+    z-index: 1000;
 }
 
-.mt-10 {
-    margin-top: 10px;
+.preview-table {
+    font-size: 9pt;
 }
 
 .preview-row,
diff --git 
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
 
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
index 8e57fc405e..1e0d949ff8 100644
--- 
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
+++ 
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
@@ -16,53 +16,51 @@
  *
  */
 
-import { Component, Input, OnInit } from '@angular/core';
-import { EditorService } from '../../services/editor.service';
+import { Component, Input, OnDestroy, OnInit } from '@angular/core';
+import { Subscription } from 'rxjs';
+import { KeyValue } from '@angular/common';
+import { LivePreviewService } from '../../../services/live-preview.service';
 
 @Component({
     selector: 'sp-pipeline-element-preview',
     templateUrl: './pipeline-element-preview.component.html',
     styleUrls: ['./pipeline-element-preview.component.scss'],
 })
-export class PipelineElementPreviewComponent implements OnInit {
+export class PipelineElementPreviewComponent implements OnInit, OnDestroy {
     @Input()
     previewId: string;
 
     @Input()
-    pipelineElementDomId: string;
-
-    runtimeData: ReadonlyMap<string, unknown>;
+    elementId: string;
 
+    runtimeData: Record<string, any>;
     runtimeDataError = false;
-    timer: any;
+    previewSub: Subscription;
 
-    constructor(private editorService: EditorService) {}
+    constructor(private livePreviewService: LivePreviewService) {}
 
     ngOnInit(): void {
         this.getLatestRuntimeInfo();
     }
 
+    keyValueCompareFn = (
+        a: KeyValue<string, any>,
+        b: KeyValue<string, any>,
+    ): number => {
+        return a.key.localeCompare(b.key);
+    };
+
     getLatestRuntimeInfo() {
-        this.editorService
-            .getPipelinePreviewResult(this.previewId, 
this.pipelineElementDomId)
-            .subscribe(data => {
-                if (data) {
-                    this.runtimeDataError = false;
-                    if (
-                        !(
-                            Object.keys(data).length === 0 &&
-                            data.constructor === Object
-                        )
-                    ) {
-                        this.runtimeData = data;
-                    }
+        this.previewSub = this.livePreviewService.eventSub.subscribe(event => {
+            if (event) {
+                this.runtimeData = event[this.elementId];
+            } else {
+                this.runtimeDataError = true;
+            }
+        });
+    }
 
-                    this.timer = setTimeout(() => {
-                        this.getLatestRuntimeInfo();
-                    }, 1000);
-                } else {
-                    this.runtimeDataError = true;
-                }
-            });
+    ngOnDestroy() {
+        this.previewSub?.unsubscribe();
     }
 }
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.html 
b/ui/src/app/editor/components/pipeline/pipeline.component.html
index 7cd3a3206b..d1015b310f 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.html
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.html
@@ -96,11 +96,11 @@
         *ngIf="
             previewModeActive &&
             pipelinePreview.supportedPipelineElementDomIds.indexOf(
-                pipelineElement.payload.dom
+                pipelineElement.payload.elementId
             ) > -1
         "
         [previewId]="pipelinePreview.previewId"
-        [pipelineElementDomId]="pipelineElement.payload.dom"
+        [elementId]="pipelineElement.payload.elementId"
     >
     </sp-pipeline-element-preview>
 </div>
diff --git a/ui/src/app/editor/components/pipeline/pipeline.component.ts 
b/ui/src/app/editor/components/pipeline/pipeline.component.ts
index 40044192e9..6838fff3cd 100644
--- a/ui/src/app/editor/components/pipeline/pipeline.component.ts
+++ b/ui/src/app/editor/components/pipeline/pipeline.component.ts
@@ -24,6 +24,7 @@ import { ShepherdService } from 
'../../../services/tour/shepherd.service';
 import {
     Component,
     EventEmitter,
+    HostListener,
     Input,
     NgZone,
     OnDestroy,
@@ -58,7 +59,7 @@ import {
 import { EditorService } from '../../services/editor.service';
 import { MatchingErrorComponent } from 
'../../dialog/matching-error/matching-error.component';
 import { MatDialog } from '@angular/material/dialog';
-import { forkJoin } from 'rxjs';
+import { forkJoin, Subscription } from 'rxjs';
 import { JsplumbFactoryService } from '../../services/jsplumb-factory.service';
 import { PipelinePositioningService } from 
'../../services/pipeline-positioning.service';
 import {
@@ -70,6 +71,8 @@ import {
 } from '@jsplumb/browser-ui';
 import { PipelineStyleService } from '../../services/pipeline-style.service';
 import { IdGeneratorService } from 
'../../../core-services/id-generator/id-generator.service';
+import { LivePreviewService } from '../../../services/live-preview.service';
+import { HttpDownloadProgressEvent } from '@angular/common/http';
 
 @Component({
     selector: 'sp-pipeline',
@@ -117,6 +120,7 @@ export class PipelineComponent implements OnInit, OnDestroy 
{
 
     previewModeActive = false;
     pipelinePreview: PipelinePreviewModel;
+    pipelinePreviewSub: Subscription;
 
     shouldOpenCustomizeSettings = false;
 
@@ -134,6 +138,7 @@ export class PipelineComponent implements OnInit, OnDestroy 
{
         private dialogService: DialogService,
         private dialog: MatDialog,
         private ngZone: NgZone,
+        private livePreviewService: LivePreviewService,
     ) {
         this.currentMouseOverElement = '';
         this.currentPipelineModel = new Pipeline();
@@ -170,6 +175,11 @@ export class PipelineComponent implements OnInit, 
OnDestroy {
         this.jsplumbFactoryService.destroy(this.preview);
     }
 
+    @HostListener('window:beforeunload')
+    onWindowClose() {
+        this.ngOnDestroy();
+    }
+
     updateMouseover(elementId: string) {
         this.currentMouseOverElement = elementId;
     }
@@ -656,6 +666,14 @@ export class PipelineComponent implements OnInit, 
OnDestroy {
                 .subscribe(response => {
                     this.pipelinePreview = response;
                     this.previewModeActive = true;
+                    this.pipelinePreviewSub = this.editorService
+                        .getPipelinePreviewResult(response.previewId)
+                        .subscribe(res => {
+                            const data = this.livePreviewService.convert(
+                                res as HttpDownloadProgressEvent,
+                            );
+                            this.livePreviewService.eventSub.next(data);
+                        });
                 });
         } else {
             this.deletePipelineElementPreview(false);
@@ -664,6 +682,7 @@ export class PipelineComponent implements OnInit, OnDestroy 
{
 
     deletePipelineElementPreview(resume: boolean) {
         if (this.previewModeActive) {
+            this.pipelinePreviewSub?.unsubscribe();
             this.editorService
                 .deletePipelinePreviewRequest(this.pipelinePreview.previewId)
                 .subscribe(() => {
diff --git a/ui/src/app/editor/services/editor.service.ts 
b/ui/src/app/editor/services/editor.service.ts
index a07f6bbee9..45ba82218d 100644
--- a/ui/src/app/editor/services/editor.service.ts
+++ b/ui/src/app/editor/services/editor.service.ts
@@ -17,7 +17,7 @@
  */
 
 import { Injectable } from '@angular/core';
-import { HttpClient, HttpContext } from '@angular/common/http';
+import { HttpClient, HttpContext, HttpEvent } from '@angular/common/http';
 import {
     DataProcessorInvocation,
     DataSinkInvocation,
@@ -189,18 +189,13 @@ export class EditorService {
         return this.http.delete(this.pipelinePreviewBasePath + '/' + 
previewId);
     }
 
-    getPipelinePreviewResult(
-        previewId: string,
-        pipelineElementDomId: string,
-    ): Observable<any> {
-        return this.http.get(
-            this.pipelinePreviewBasePath +
-                '/' +
-                previewId +
-                '/' +
-                pipelineElementDomId,
-            { context: new HttpContext().set(NGX_LOADING_BAR_IGNORED, true) },
-        );
+    getPipelinePreviewResult(previewId: string): Observable<HttpEvent<string>> 
{
+        return this.http.get(`${this.pipelinePreviewBasePath}/${previewId}`, {
+            responseType: 'text',
+            observe: 'events',
+            reportProgress: true,
+            context: new HttpContext().set(NGX_LOADING_BAR_IGNORED, true),
+        });
     }
 
     get pipelinePreviewBasePath() {
diff --git 
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
 b/ui/src/app/services/live-preview.service.ts
similarity index 61%
copy from 
ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
copy to ui/src/app/services/live-preview.service.ts
index 3734f07db9..8f6ccb39fd 100644
--- 
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.scss
+++ b/ui/src/app/services/live-preview.service.ts
@@ -16,28 +16,19 @@
  *
  */
 
-.data-preview {
-    position: relative;
-    left: 0px;
-    top: 120px;
-    width: 250px;
-    height: 120px;
-    background: var(--color-bg-dialog);
-    overflow: auto;
-    border: 1px solid gray;
-    box-shadow: 0.175em 0.175em 0 0 rgba(15, 28, 63, 0.125);
-    z-index: 50;
-}
-
-.preview-table {
-    font-size: 9pt;
-}
+import { Injectable } from '@angular/core';
+import { HttpDownloadProgressEvent } from '@angular/common/http';
+import { Subject } from 'rxjs';
 
-.mt-10 {
-    margin-top: 10px;
-}
+@Injectable({ providedIn: 'root' })
+export class LivePreviewService {
+    public eventSub: Subject<Record<string, any>> = new Subject<
+        Record<string, any>
+    >();
 
-.preview-row,
-.preview-table {
-    background: var(--color-bg-dialog);
+    convert(event: HttpDownloadProgressEvent) {
+        const { partialText } = event;
+        const chunks = partialText.split('\n');
+        return JSON.parse(chunks[chunks.length - 2]);
+    }
 }

Reply via email to