This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1085
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/SP-1085 by this push:
     new 9b1d370de [#1085] Remove SendToBrokerReplayAdapterSink and move code 
to FileStreamProtocol
9b1d370de is described below

commit 9b1d370def1041e1c2120b0f3c1503d5bcccd8dc
Author: Philipp Zehnder <[email protected]>
AuthorDate: Sat Jan 14 09:08:23 2023 +0100

    [#1085] Remove SendToBrokerReplayAdapterSink and move code to 
FileStreamProtocol
---
 .../elements/SendToBrokerReplayAdapterSink.java    |  97 -------------------
 .../iiot/protocol/stream/FileStreamProtocol.java   | 105 ++++++++++++---------
 2 files changed, 61 insertions(+), 141 deletions(-)

diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
deleted file mode 100644
index 812e7d61e..000000000
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements;
-
-import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
-import 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.Util;
-
-import java.util.List;
-import java.util.Map;
-
-public class SendToBrokerReplayAdapterSink implements IAdapterPipelineElement {
-
-  private final SendToBrokerAdapterSink sendToBrokerAdapterSink;
-  private long lastEventTimestamp;
-  private final List<String> timestampKeys;
-  private final boolean replaceTimestamp;
-  private final float speedUp;
-
-
-  public SendToBrokerReplayAdapterSink(SendToBrokerAdapterSink 
sendToBrokerAdapterSink,
-                                       String timestampKey, boolean 
replaceTimestamp, float speedUp) {
-    this.sendToBrokerAdapterSink = sendToBrokerAdapterSink;
-    this.lastEventTimestamp = -1;
-    this.timestampKeys = Util.toKeyArray(timestampKey);
-    this.replaceTimestamp = replaceTimestamp;
-    this.speedUp = speedUp;
-  }
-
-  @Override
-  public Map<String, Object> process(Map<String, Object> event) {
-    if ((event != null) && (lastEventTimestamp != -1)) {
-      long actualEventTimestamp = getTimestampInEvent(event);
-      try {
-        if ((actualEventTimestamp - lastEventTimestamp) > 0) {
-          Thread.sleep((long) ((actualEventTimestamp - lastEventTimestamp) / 
speedUp));
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      if (replaceTimestamp) {
-        setTimestampInEvent(event, System.currentTimeMillis());
-      }
-      lastEventTimestamp = actualEventTimestamp;
-
-    } else if (lastEventTimestamp == -1) {
-      lastEventTimestamp = getTimestampInEvent(event);
-      if (replaceTimestamp) {
-        setTimestampInEvent(event, System.currentTimeMillis());
-      }
-    }
-    return sendToBrokerAdapterSink.process(event);
-  }
-
-  private long getTimestampInEvent(Map<String, Object> event) {
-    if (timestampKeys.size() == 1) {
-      try {
-        return (long) event.get(timestampKeys.get(0));
-      } catch (ClassCastException e) {
-        return lastEventTimestamp;
-      }
-    }
-    Map<String, Object> subEvent = event;
-    for (int i = 0; i < timestampKeys.size() - 1; i++) {
-      subEvent = (Map<String, Object>) subEvent.get(timestampKeys.get(i));
-    }
-    return (long) subEvent.get(timestampKeys.get(timestampKeys.size() - 1));
-
-  }
-
-  private void setTimestampInEvent(Map<String, Object> event, long timestamp) {
-    if (timestampKeys.size() == 1) {
-      event.put(timestampKeys.get(0), timestamp);
-    } else {
-      Map<String, Object> subEvent = event;
-      for (int i = 0; i < timestampKeys.size() - 1; i++) {
-        subEvent = (Map<String, Object>) subEvent.get(timestampKeys.get(i));
-      }
-      subEvent.put(timestampKeys.get(timestampKeys.size() - 1), timestamp);
-    }
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 060a49cab..4b588f882 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -19,18 +19,14 @@
 package org.apache.streampipes.connect.iiot.protocol.stream;
 
 import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
+import org.apache.streampipes.extensions.api.connect.EmitBinaryEvent;
 import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
 import org.apache.streampipes.extensions.api.connect.IFormat;
 import org.apache.streampipes.extensions.api.connect.IParser;
 import 
org.apache.streampipes.extensions.api.connect.exception.AdapterException;
 import org.apache.streampipes.extensions.api.connect.exception.ParseException;
-import org.apache.streampipes.extensions.management.connect.SendToPipeline;
 import 
org.apache.streampipes.extensions.management.connect.adapter.guess.SchemaGuesser;
 import 
org.apache.streampipes.extensions.management.connect.adapter.model.generic.Protocol;
-import 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToBrokerReplayAdapterSink;
-import 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToJmsAdapterSink;
-import 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
-import 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToMqttAdapterSink;
 import org.apache.streampipes.extensions.management.util.EventSchemaUtils;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
@@ -52,8 +48,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class FileStreamProtocol extends Protocol {
 
@@ -66,7 +63,7 @@ public class FileStreamProtocol extends Protocol {
   private float speedUp;
   private int timeBetweenReplay;
 
-  private ExecutorService executor;
+  private ScheduledExecutorService executor;
 
   public FileStreamProtocol() {
   }
@@ -88,53 +85,73 @@ public class FileStreamProtocol extends Protocol {
   public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
     String timestampKey = 
getTimestampKey(adapterPipeline.getResultingEventSchema());
 
-    // exchange adapter pipeline sink with special purpose replay sink for 
file replay
-    if (adapterPipeline.getPipelineSink() instanceof SendToKafkaAdapterSink) {
-      adapterPipeline.changePipelineSink(new SendToBrokerReplayAdapterSink(
-          (SendToKafkaAdapterSink) adapterPipeline.getPipelineSink(),
-          timestampKey,
-          replaceTimestamp,
-          speedUp));
-
-    } else if (adapterPipeline.getPipelineSink() instanceof 
SendToMqttAdapterSink) {
-      adapterPipeline.changePipelineSink(new SendToBrokerReplayAdapterSink(
-          (SendToMqttAdapterSink) adapterPipeline.getPipelineSink(),
-          timestampKey,
-          replaceTimestamp,
-          speedUp));
-
-    } else if (adapterPipeline.getPipelineSink() instanceof 
SendToJmsAdapterSink) {
-      adapterPipeline.changePipelineSink(new SendToBrokerReplayAdapterSink(
-          (SendToJmsAdapterSink) adapterPipeline.getPipelineSink(),
-          timestampKey,
-          replaceTimestamp,
-          speedUp));
-    }
 
-    executor = Executors.newSingleThreadExecutor();
+    executor = Executors.newScheduledThreadPool(1);
+    var eventProcessor = new LocalEventProcessor(adapterPipeline, 
timestampKey);
 
-    executor.execute(() -> {
-      format.reset();
-      SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
-      InputStream dataInputStream = getDataFromEndpoint();
-      try {
-        parser.parse(dataInputStream, stk);
-      } catch (ParseException e) {
+    executor.scheduleAtFixedRate(() -> {
+      try (InputStream dataInputStream = getDataFromEndpoint()) {
+        format.reset();
+        parser.parse(dataInputStream, eventProcessor);
+      } catch (ParseException | IOException e) {
         logger.error("Error while parsing: " + e.getMessage());
       }
+    }, 0, timeBetweenReplay, TimeUnit.SECONDS);
+  }
+
+  private class LocalEventProcessor implements EmitBinaryEvent {
+
+    private final IAdapterPipeline adapterPipeline;
+    private final String timestampKey;
+
+    private long lastEventTimestamp = -1;
+
+    /**
+     * This local class is responsible to parse the events and set the 
timestamp accordign to the selected replay
+     * configurations
+     * @param adapterPipeline that transforms the events and send them to the 
message broker in the end
+     * @param timestampKey the runtimeName of the timestamp field
+     */
+    public LocalEventProcessor(IAdapterPipeline adapterPipeline,
+                               String timestampKey) {
+      this.adapterPipeline = adapterPipeline;
+      this.timestampKey = timestampKey;
+      format.reset();
+    }
+
+    @Override
+    public Boolean emit(byte[] event) {
+
+      var eventMap = format.parse(event);
+      if (eventMap != null) {
+        long actualEventTimestamp = (long) eventMap.get(timestampKey);
+
+        if (lastEventTimestamp != -1) {
+          long sleepTime = (long) ((actualEventTimestamp - lastEventTimestamp) 
/ speedUp);
+          if (sleepTime > 0) {
+            try {
+              Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+
+        if (replaceTimestamp) {
+          eventMap.put(timestampKey, System.currentTimeMillis());
+        }
+        lastEventTimestamp = actualEventTimestamp;
+
+        adapterPipeline.process(eventMap);
 
-      try {
-        Thread.sleep(timeBetweenReplay * 1000L);
-      } catch (InterruptedException e) {
-        logger.error("Error while waiting for next replay round" + 
e.getMessage());
       }
-    });
+      return true;
+    }
   }
 
-
   @Override
   public void stop() {
-    executor.shutdown();
+    executor.shutdownNow();
     logger.info("Stopped file stream adapter for file " + selectedFileName);
   }
 

Reply via email to