bossenti commented on code in PR #2559:
URL: https://github.com/apache/streampipes/pull/2559#discussion_r1525888862
##########
streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java:
##########
@@ -133,66 +155,113 @@ public void onAdapterStarted(IAdapterParameterExtractor
extractor,
.getEventProperties()
.stream()
.filter(eventProperty ->
-
eventProperty.getDomainProperties().contains(URI.create(SO.DATE_TIME)))
+ eventProperty.getDomainProperties()
+ .contains(URI.create(SO.DATE_TIME)))
.findFirst();
if (timestampField.isEmpty()) {
throw new AdapterException("Could not find a timestamp field in event
schema");
} else {
- timestampRuntimeName = timestampField.get().getRuntimeName();
+ timestampRuntimeName = timestampField.get()
+ .getRuntimeName();
+ }
+
+ // check for renaming rules that affect the timestamp property
+ // if there is one, we need to use the original field name when extracting
the timestamp from the raw data
+ var renamingRulesTimestamp = getRenamingRulesTimestamp(extractor);
+ if (!renamingRulesTimestamp.isEmpty()) {
+ if (renamingRulesTimestamp.size() == 1) {
+ timestampSourceFieldName = renamingRulesTimestamp.get(0)
+ .getOldRuntimeKey();
+ } else {
+ throw new AdapterException("Invalid configuration - multiple renaming
rules detected which affect the "
+ + "timestamp property.");
+ }
+ } else {
+ // no renaming rules can be found, timestamp name does not change
+ timestampSourceFieldName = timestampRuntimeName;
}
// start replay adapter
if (replayOnce) {
- executor.schedule(() -> parseFile(extractor, collector,
adapterRuntimeContext),
+ executor.schedule(
+ () -> parseFile(extractor, collector, adapterRuntimeContext),
0,
- TimeUnit.SECONDS);
+ TimeUnit.SECONDS
+ );
} else {
- executor.scheduleAtFixedRate(() -> parseFile(extractor, collector,
adapterRuntimeContext),
+ executor.scheduleAtFixedRate(
+ () -> parseFile(extractor, collector, adapterRuntimeContext),
0,
1,
- TimeUnit.SECONDS);
+ TimeUnit.SECONDS
+ );
}
}
- private void parseFile(IAdapterParameterExtractor extractor,
- IEventCollector collector,
- IAdapterRuntimeContext adapterRuntimeContext) {
+ /**
+ * Retrieves a list of {@link RenameRuleDescription} affecting the timestamp
property.
+ *
+ * @param extractor An instance of IAdapterParameterExtractor used to
extract adapter description and rules.
+ * @return A list of {@link RenameRuleDescription} containing rules
affecting timestamp property.
+ */
+ private List<RenameRuleDescription>
getRenamingRulesTimestamp(IAdapterParameterExtractor extractor) {
+ var renamingRules =
+ extractor.getAdapterDescription()
+ .getRules()
+ .stream()
+ .filter(rule -> rule instanceof RenameRuleDescription)
+ .map(rule -> (RenameRuleDescription) rule)
+ .toList();
+
+ return renamingRules.stream()
+ .filter(rule -> rule.getNewRuntimeKey()
+ .equals(timestampRuntimeName))
+ .toList();
+ }
+
+ private void parseFile(
+ IAdapterParameterExtractor extractor,
+ IEventCollector collector,
+ IAdapterRuntimeContext adapterRuntimeContext
+ ) {
try {
InputStream inputStream = getDataFromEndpoint(extractor
.getStaticPropertyExtractor()
.selectedFilename(FILE_PATH));
- extractor.selectedParser().parse(inputStream, (event) -> {
-
- long actualEventTimestamp = -1;
- if (event.get(timestampRuntimeName) instanceof Long) {
- actualEventTimestamp = (long) event.get(timestampRuntimeName);
- } else {
- LOG.error(
- "The timestamp field is not a unix timestamp in ms. Value: %s"
- .formatted(event.get(timestampRuntimeName)));
- }
-
- if (timestampLastEvent != -1) {
- long sleepTime = (long) ((actualEventTimestamp - timestampLastEvent)
/ speedUp);
- // speed up is set to Float.MAX_VALUE when user selected fastest
option
- if (sleepTime > 0 && speedUp != Float.MAX_VALUE) {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- LOG.info("File stream adapter was stopped, the current replay is
interuppted", e);
+ extractor.selectedParser()
+ .parse(inputStream, (event) -> {
+
+ long actualEventTimestamp = -1;
+ if (event.get(timestampSourceFieldName) instanceof Long) {
+ actualEventTimestamp = (long)
event.get(timestampSourceFieldName);
+ } else {
+ LOG.error(
+ "The timestamp field is not a unix timestamp in ms. Value:
%s"
+ .formatted(event.get(timestampSourceFieldName)));
}
- }
- }
- timestampLastEvent = actualEventTimestamp;
- if (replaceTimestamp) {
- event.put(timestampRuntimeName, System.currentTimeMillis());
- }
+ if (timestampLastEvent != -1) {
+ long sleepTime = (long) ((actualEventTimestamp -
timestampLastEvent) / speedUp);
+ // speed up is set to Float.MAX_VALUE when user selected fastest
option
+ if (sleepTime > 0 && speedUp != Float.MAX_VALUE) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ LOG.info("File stream adapter was stopped, the current
replay is interuppted", e);
+ }
+ }
+ }
+
+ timestampLastEvent = actualEventTimestamp;
+ if (replaceTimestamp) {
+ // we can directly use the desired runtime name for the
timestamp instead of the original here
+ event.put(timestampRuntimeName, System.currentTimeMillis());
Review Comment:
I think it won't be `null` but it will probably be overwritten with the old
value.
I'll change it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]