This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch SP-1085
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/SP-1085 by this push:
new 9b1d370de [#1085] Remove SendToBrokerReplayAdapterSink and move code
to FileStreamProtocol
9b1d370de is described below
commit 9b1d370def1041e1c2120b0f3c1503d5bcccd8dc
Author: Philipp Zehnder <[email protected]>
AuthorDate: Sat Jan 14 09:08:23 2023 +0100
[#1085] Remove SendToBrokerReplayAdapterSink and move code to
FileStreamProtocol
---
.../elements/SendToBrokerReplayAdapterSink.java | 97 -------------------
.../iiot/protocol/stream/FileStreamProtocol.java | 105 ++++++++++++---------
2 files changed, 61 insertions(+), 141 deletions(-)
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
deleted file mode 100644
index 812e7d61e..000000000
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerReplayAdapterSink.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements;
-
-import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
-import
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.Util;
-
-import java.util.List;
-import java.util.Map;
-
-public class SendToBrokerReplayAdapterSink implements IAdapterPipelineElement {
-
- private final SendToBrokerAdapterSink sendToBrokerAdapterSink;
- private long lastEventTimestamp;
- private final List<String> timestampKeys;
- private final boolean replaceTimestamp;
- private final float speedUp;
-
-
- public SendToBrokerReplayAdapterSink(SendToBrokerAdapterSink
sendToBrokerAdapterSink,
- String timestampKey, boolean
replaceTimestamp, float speedUp) {
- this.sendToBrokerAdapterSink = sendToBrokerAdapterSink;
- this.lastEventTimestamp = -1;
- this.timestampKeys = Util.toKeyArray(timestampKey);
- this.replaceTimestamp = replaceTimestamp;
- this.speedUp = speedUp;
- }
-
- @Override
- public Map<String, Object> process(Map<String, Object> event) {
- if ((event != null) && (lastEventTimestamp != -1)) {
- long actualEventTimestamp = getTimestampInEvent(event);
- try {
- if ((actualEventTimestamp - lastEventTimestamp) > 0) {
- Thread.sleep((long) ((actualEventTimestamp - lastEventTimestamp) /
speedUp));
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if (replaceTimestamp) {
- setTimestampInEvent(event, System.currentTimeMillis());
- }
- lastEventTimestamp = actualEventTimestamp;
-
- } else if (lastEventTimestamp == -1) {
- lastEventTimestamp = getTimestampInEvent(event);
- if (replaceTimestamp) {
- setTimestampInEvent(event, System.currentTimeMillis());
- }
- }
- return sendToBrokerAdapterSink.process(event);
- }
-
- private long getTimestampInEvent(Map<String, Object> event) {
- if (timestampKeys.size() == 1) {
- try {
- return (long) event.get(timestampKeys.get(0));
- } catch (ClassCastException e) {
- return lastEventTimestamp;
- }
- }
- Map<String, Object> subEvent = event;
- for (int i = 0; i < timestampKeys.size() - 1; i++) {
- subEvent = (Map<String, Object>) subEvent.get(timestampKeys.get(i));
- }
- return (long) subEvent.get(timestampKeys.get(timestampKeys.size() - 1));
-
- }
-
- private void setTimestampInEvent(Map<String, Object> event, long timestamp) {
- if (timestampKeys.size() == 1) {
- event.put(timestampKeys.get(0), timestamp);
- } else {
- Map<String, Object> subEvent = event;
- for (int i = 0; i < timestampKeys.size() - 1; i++) {
- subEvent = (Map<String, Object>) subEvent.get(timestampKeys.get(i));
- }
- subEvent.put(timestampKeys.get(timestampKeys.size() - 1), timestamp);
- }
- }
-}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 060a49cab..4b588f882 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -19,18 +19,14 @@
package org.apache.streampipes.connect.iiot.protocol.stream;
import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
+import org.apache.streampipes.extensions.api.connect.EmitBinaryEvent;
import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
import org.apache.streampipes.extensions.api.connect.IFormat;
import org.apache.streampipes.extensions.api.connect.IParser;
import
org.apache.streampipes.extensions.api.connect.exception.AdapterException;
import org.apache.streampipes.extensions.api.connect.exception.ParseException;
-import org.apache.streampipes.extensions.management.connect.SendToPipeline;
import
org.apache.streampipes.extensions.management.connect.adapter.guess.SchemaGuesser;
import
org.apache.streampipes.extensions.management.connect.adapter.model.generic.Protocol;
-import
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToBrokerReplayAdapterSink;
-import
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToJmsAdapterSink;
-import
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
-import
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.SendToMqttAdapterSink;
import org.apache.streampipes.extensions.management.util.EventSchemaUtils;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
@@ -52,8 +48,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
public class FileStreamProtocol extends Protocol {
@@ -66,7 +63,7 @@ public class FileStreamProtocol extends Protocol {
private float speedUp;
private int timeBetweenReplay;
- private ExecutorService executor;
+ private ScheduledExecutorService executor;
public FileStreamProtocol() {
}
@@ -88,53 +85,73 @@ public class FileStreamProtocol extends Protocol {
public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
String timestampKey =
getTimestampKey(adapterPipeline.getResultingEventSchema());
- // exchange adapter pipeline sink with special purpose replay sink for
file replay
- if (adapterPipeline.getPipelineSink() instanceof SendToKafkaAdapterSink) {
- adapterPipeline.changePipelineSink(new SendToBrokerReplayAdapterSink(
- (SendToKafkaAdapterSink) adapterPipeline.getPipelineSink(),
- timestampKey,
- replaceTimestamp,
- speedUp));
-
- } else if (adapterPipeline.getPipelineSink() instanceof
SendToMqttAdapterSink) {
- adapterPipeline.changePipelineSink(new SendToBrokerReplayAdapterSink(
- (SendToMqttAdapterSink) adapterPipeline.getPipelineSink(),
- timestampKey,
- replaceTimestamp,
- speedUp));
-
- } else if (adapterPipeline.getPipelineSink() instanceof
SendToJmsAdapterSink) {
- adapterPipeline.changePipelineSink(new SendToBrokerReplayAdapterSink(
- (SendToJmsAdapterSink) adapterPipeline.getPipelineSink(),
- timestampKey,
- replaceTimestamp,
- speedUp));
- }
- executor = Executors.newSingleThreadExecutor();
+ executor = Executors.newScheduledThreadPool(1);
+ var eventProcessor = new LocalEventProcessor(adapterPipeline,
timestampKey);
- executor.execute(() -> {
- format.reset();
- SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
- InputStream dataInputStream = getDataFromEndpoint();
- try {
- parser.parse(dataInputStream, stk);
- } catch (ParseException e) {
+ executor.scheduleAtFixedRate(() -> {
+ try (InputStream dataInputStream = getDataFromEndpoint()) {
+ format.reset();
+ parser.parse(dataInputStream, eventProcessor);
+ } catch (ParseException | IOException e) {
logger.error("Error while parsing: " + e.getMessage());
}
+ }, 0, timeBetweenReplay, TimeUnit.SECONDS);
+ }
+
+ private class LocalEventProcessor implements EmitBinaryEvent {
+
+ private final IAdapterPipeline adapterPipeline;
+ private final String timestampKey;
+
+ private long lastEventTimestamp = -1;
+
+ /**
+ * This local class is responsible to parse the events and set the
timestamp accordign to the selected replay
+ * configurations
+ * @param adapterPipeline that transforms the events and send them to the
message broker in the end
+ * @param timestampKey the runtimeName of the timestamp field
+ */
+ public LocalEventProcessor(IAdapterPipeline adapterPipeline,
+ String timestampKey) {
+ this.adapterPipeline = adapterPipeline;
+ this.timestampKey = timestampKey;
+ format.reset();
+ }
+
+ @Override
+ public Boolean emit(byte[] event) {
+
+ var eventMap = format.parse(event);
+ if (eventMap != null) {
+ long actualEventTimestamp = (long) eventMap.get(timestampKey);
+
+ if (lastEventTimestamp != -1) {
+ long sleepTime = (long) ((actualEventTimestamp - lastEventTimestamp)
/ speedUp);
+ if (sleepTime > 0) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ if (replaceTimestamp) {
+ eventMap.put(timestampKey, System.currentTimeMillis());
+ }
+ lastEventTimestamp = actualEventTimestamp;
+
+ adapterPipeline.process(eventMap);
- try {
- Thread.sleep(timeBetweenReplay * 1000L);
- } catch (InterruptedException e) {
- logger.error("Error while waiting for next replay round" +
e.getMessage());
}
- });
+ return true;
+ }
}
-
@Override
public void stop() {
- executor.shutdown();
+ executor.shutdownNow();
logger.info("Stopped file stream adapter for file " + selectedFileName);
}