bossenti commented on code in PR #2559:
URL: https://github.com/apache/streampipes/pull/2559#discussion_r1525881441


##########
streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java:
##########
@@ -133,66 +155,113 @@ public void onAdapterStarted(IAdapterParameterExtractor 
extractor,
         .getEventProperties()
         .stream()
         .filter(eventProperty ->
-            
eventProperty.getDomainProperties().contains(URI.create(SO.DATE_TIME)))
+            eventProperty.getDomainProperties()
+                .contains(URI.create(SO.DATE_TIME)))
         .findFirst();
 
     if (timestampField.isEmpty()) {
       throw new AdapterException("Could not find a timestamp field in event 
schema");
     } else {
-      timestampRuntimeName = timestampField.get().getRuntimeName();
+      timestampRuntimeName = timestampField.get()
+          .getRuntimeName();
+    }
+
+    // check for renaming rules that affect the timestamp property
+    // if there is one, we need to use the original field name when extracting 
the timestamp from the raw data
+    var renamingRulesTimestamp = getRenamingRulesTimestamp(extractor);
+    if (!renamingRulesTimestamp.isEmpty()) {
+      if (renamingRulesTimestamp.size() == 1) {
+        timestampSourceFieldName = renamingRulesTimestamp.get(0)
+            .getOldRuntimeKey();
+      } else {
+        throw new AdapterException("Invalid configuration - multiple renaming 
rules detected which affect the "
+            + "timestamp property.");
+      }
+    } else {
+      // no renaming rules can be found, timestamp name does not change
+      timestampSourceFieldName = timestampRuntimeName;
     }
 
     // start replay adapter
     if (replayOnce) {
-      executor.schedule(() -> parseFile(extractor, collector, 
adapterRuntimeContext),
+      executor.schedule(
+          () -> parseFile(extractor, collector, adapterRuntimeContext),
           0,
-          TimeUnit.SECONDS);
+          TimeUnit.SECONDS
+      );
     } else {
-      executor.scheduleAtFixedRate(() -> parseFile(extractor, collector, 
adapterRuntimeContext),
+      executor.scheduleAtFixedRate(
+          () -> parseFile(extractor, collector, adapterRuntimeContext),
           0,
           1,
-          TimeUnit.SECONDS);
+          TimeUnit.SECONDS
+      );
     }
   }
 
-  private void parseFile(IAdapterParameterExtractor extractor,
-                         IEventCollector collector,
-                         IAdapterRuntimeContext adapterRuntimeContext) {
+  /**
+   * Retrieves a list of {@link RenameRuleDescription} affecting the timestamp 
property.
+   *
+   * @param extractor An instance of IAdapterParameterExtractor used to 
extract adapter description and rules.
+   * @return A list of {@link RenameRuleDescription} containing rules 
affecting timestamp property.
+   */
+  private List<RenameRuleDescription> 
getRenamingRulesTimestamp(IAdapterParameterExtractor extractor) {
+    var renamingRules =
+        extractor.getAdapterDescription()
+            .getRules()
+            .stream()
+            .filter(rule -> rule instanceof RenameRuleDescription)
+            .map(rule -> (RenameRuleDescription) rule)
+            .toList();
+
+    return renamingRules.stream()
+        .filter(rule -> rule.getNewRuntimeKey()
+            .equals(timestampRuntimeName))
+        .toList();
+  }
+
+  private void parseFile(
+      IAdapterParameterExtractor extractor,
+      IEventCollector collector,
+      IAdapterRuntimeContext adapterRuntimeContext
+  ) {
     try {
       InputStream inputStream = getDataFromEndpoint(extractor
           .getStaticPropertyExtractor()
           .selectedFilename(FILE_PATH));
 
-      extractor.selectedParser().parse(inputStream, (event) -> {
-
-        long actualEventTimestamp = -1;
-        if (event.get(timestampRuntimeName) instanceof Long) {
-          actualEventTimestamp = (long) event.get(timestampRuntimeName);
-        } else {
-          LOG.error(
-              "The timestamp field is not a unix timestamp in ms. Value: %s"
-                  .formatted(event.get(timestampRuntimeName)));
-        }
-
-        if (timestampLastEvent != -1) {
-          long sleepTime = (long) ((actualEventTimestamp - timestampLastEvent) 
/ speedUp);
-          // speed up is set to Float.MAX_VALUE when user selected fastest 
option
-          if (sleepTime > 0 && speedUp != Float.MAX_VALUE) {
-            try {
-              Thread.sleep(sleepTime);
-            } catch (InterruptedException e) {
-              LOG.info("File stream adapter was stopped, the current replay is 
interuppted", e);
+      extractor.selectedParser()
+          .parse(inputStream, (event) -> {
+
+            long actualEventTimestamp = -1;
+            if (event.get(timestampSourceFieldName) instanceof Long) {
+              actualEventTimestamp = (long) 
event.get(timestampSourceFieldName);
+            } else {
+              LOG.error(
+                  "The timestamp field is not a unix timestamp in ms. Value: 
%s"
+                      .formatted(event.get(timestampSourceFieldName)));
             }
-          }
-        }
 
-        timestampLastEvent = actualEventTimestamp;
-        if (replaceTimestamp) {
-          event.put(timestampRuntimeName, System.currentTimeMillis());
-        }
+            if (timestampLastEvent != -1) {
+              long sleepTime = (long) ((actualEventTimestamp - 
timestampLastEvent) / speedUp);
+              // speed up is set to Float.MAX_VALUE when user selected fastest 
option
+              if (sleepTime > 0 && speedUp != Float.MAX_VALUE) {
+                try {
+                  Thread.sleep(sleepTime);
+                } catch (InterruptedException e) {
+                  LOG.info("File stream adapter was stopped, the current 
replay is interuppted", e);
+                }
+              }
+            }
+
+            timestampLastEvent = actualEventTimestamp;
+            if (replaceTimestamp) {
+              // we can directly use the desired runtime name for the 
timestamp instead of the original here
+              event.put(timestampRuntimeName, System.currentTimeMillis());

Review Comment:
   I'll have a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to