This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 56edbce467 feat(#4111): Implement event schema diff service to apply
user configuration changes (#4128)
56edbce467 is described below
commit 56edbce46784c5b709c1fe552aa0a8cbb3ed46a8
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Jan 27 15:42:13 2026 +0100
feat(#4111): Implement event schema diff service to apply user
configuration changes (#4128)
---
ui/cypress/tests/connect/editAdapter.smoke.spec.ts | 3 +-
.../editAdapterSettingsAndPipeline.smoke.spec.ts | 2 -
.../connect/editAdapterValuesAndFields.spec.ts | 10 +-
.../opcua/startAndEditOpcAdapters.smoke.spec.ts | 2 -
.../adapter-configuration-state.service.ts | 10 +
.../connect/services/event-schema-diff.service.ts | 207 +++++++++++++++++++++
6 files changed, 226 insertions(+), 8 deletions(-)
diff --git a/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
b/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
index 5ed3c44c19..00193f3a83 100644
--- a/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
+++ b/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
@@ -19,11 +19,11 @@
import { ConnectUtils } from '../../support/utils/connect/ConnectUtils';
import { ConnectBtns } from '../../support/utils/connect/ConnectBtns';
import { AdapterBuilder } from '../../support/builder/AdapterBuilder';
-import { ConnectEventSchemaUtils } from
'../../support/utils/connect/ConnectEventSchemaUtils';
import { DataExplorerUtils } from
'../../support/utils/dataExplorer/DataExplorerUtils';
import { DataExplorerBtns } from
'../../support/utils/dataExplorer/DataExplorerBtns';
import { SharedUtils } from '../../support/utils/shared/SharedUtils';
import { SharedBtns } from '../../support/utils/shared/SharedBtns';
+import { ConnectEventSchemaUtils } from
'../../support/utils/connect/ConnectEventSchemaUtils';
describe('Test Edit Adapter', () => {
beforeEach('Setup Test', () => {
@@ -67,7 +67,6 @@ describe('Test Edit Adapter', () => {
cy.wait(1000);
ConnectBtns.refreshSchemaBtn().click();
- ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
ConnectUtils.finishConfigureFieldsConfiguration();
ConnectBtns.adapterNameInput().clear().type(newAdapterName);
diff --git
a/ui/cypress/tests/connect/editAdapterSettingsAndPipeline.smoke.spec.ts
b/ui/cypress/tests/connect/editAdapterSettingsAndPipeline.smoke.spec.ts
index 4ab10beba1..e1f64d18e7 100644
--- a/ui/cypress/tests/connect/editAdapterSettingsAndPipeline.smoke.spec.ts
+++ b/ui/cypress/tests/connect/editAdapterSettingsAndPipeline.smoke.spec.ts
@@ -25,7 +25,6 @@ import { AdapterBuilder } from
'../../support/builder/AdapterBuilder';
import { DataExplorerUtils } from
'../../support/utils/dataExplorer/DataExplorerUtils';
import { GeneralUtils } from '../../support/utils/GeneralUtils';
import { PipelineBtns } from '../../support/utils/pipeline/PipelineBtns';
-import { ConnectEventSchemaUtils } from
'../../support/utils/connect/ConnectEventSchemaUtils';
import { SharedUtils } from '../../support/utils/shared/SharedUtils';
import { SharedBtns } from '../../support/utils/shared/SharedBtns';
@@ -76,7 +75,6 @@ describe('Test Edit Adapter and Pipeline', () => {
SharedBtns.confirmDialogConfirmBtn().click();
cy.wait(1000);
ConnectBtns.refreshSchemaBtn().click();
- ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
ConnectUtils.finishConfigureFieldsConfiguration();
// Update event schema
ConnectBtns.storeEditAdapter().click();
diff --git a/ui/cypress/tests/connect/editAdapterValuesAndFields.spec.ts
b/ui/cypress/tests/connect/editAdapterValuesAndFields.spec.ts
index 69bf8b5c4d..8f29961867 100644
--- a/ui/cypress/tests/connect/editAdapterValuesAndFields.spec.ts
+++ b/ui/cypress/tests/connect/editAdapterValuesAndFields.spec.ts
@@ -20,6 +20,8 @@ import { ConnectUtils } from
'../../support/utils/connect/ConnectUtils';
import { ConnectBtns } from '../../support/utils/connect/ConnectBtns';
import { AdapterBuilder } from '../../support/builder/AdapterBuilder';
import { ConnectEventSchemaUtils } from
'../../support/utils/connect/ConnectEventSchemaUtils';
+import { SharedUtils } from '../../support/utils/shared/SharedUtils';
+import { SharedBtns } from '../../support/utils/shared/SharedBtns';
describe('Test Edit Adapter', () => {
const adapterName = 'Test Adapter';
@@ -70,7 +72,7 @@ describe('Test Edit Adapter', () => {
}
function validateSavedAdapterEdits() {
- // Edit adapter and check if given values and added property still
provided
+ // Edit the adapter and check if given values and added property still
provided
ConnectBtns.openActionsMenu(adapterName);
ConnectBtns.editAdapter().should('not.be.disabled');
ConnectBtns.editAdapter().click();
@@ -95,7 +97,7 @@ describe('Test Edit Adapter', () => {
}
function reconfigureAdapterToUsePressureSensorInsteadOfFlowRate() {
- // Configure adapter with pressure instead of flowrate
+ // Configure the adapter with pressure instead of flowrate
ConnectBtns.openActionsMenu(adapterName);
ConnectBtns.editAdapter().click();
@@ -104,7 +106,11 @@ describe('Test Edit Adapter', () => {
.addInput('radio', 'selected', 'simulator-option-pressure')
.build();
ConnectUtils.configureAdapter(adapterInput);
+ SharedUtils.confirmDialogVisible();
+ SharedBtns.confirmDialogConfirmBtn().click();
ConnectBtns.configureSchemaNextBtn().click();
+ SharedUtils.confirmDialogVisible();
+ SharedBtns.confirmDialogConfirmBtn().click();
storeAndCloseAdapterPreview();
}
diff --git
a/ui/cypress/tests/connect/opcua/startAndEditOpcAdapters.smoke.spec.ts
b/ui/cypress/tests/connect/opcua/startAndEditOpcAdapters.smoke.spec.ts
index dbf387acd8..d8383ff7d4 100644
--- a/ui/cypress/tests/connect/opcua/startAndEditOpcAdapters.smoke.spec.ts
+++ b/ui/cypress/tests/connect/opcua/startAndEditOpcAdapters.smoke.spec.ts
@@ -20,7 +20,6 @@ import { ConnectUtils } from
'../../../support/utils/connect/ConnectUtils';
import { TreeNodeUserInputBuilder } from
'../../../support/builder/TreeNodeUserInputBuilder';
import { ConnectBtns } from '../../../support/utils/connect/ConnectBtns';
import { TreeStaticPropertyUtils } from
'../../../support/utils/userInput/TreeStaticPropertyUtils';
-import { ConnectEventSchemaUtils } from
'../../../support/utils/connect/ConnectEventSchemaUtils';
import { AdapterInput } from '../../../support/model/AdapterInput';
import { OpcUaUtils } from '../../../support/utils/connect/OpcUaUtils';
import { GeneralUtils } from '../../../support/utils/GeneralUtils';
@@ -99,7 +98,6 @@ const editAdapterTest = (adapterInput: AdapterInput) => {
// Same as for new sample, once automated, this can be removed
cy.wait(1000);
ConnectBtns.refreshSchemaBtn().click();
- ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
ConnectUtils.finishConfigureFieldsConfiguration();
ConnectBtns.storeEditAdapter().click();
diff --git
a/ui/src/app/connect/components/adapter-configuration/adapter-configuration-state-service/adapter-configuration-state.service.ts
b/ui/src/app/connect/components/adapter-configuration/adapter-configuration-state-service/adapter-configuration-state.service.ts
index 5917dfe4fe..0d94319861 100644
---
a/ui/src/app/connect/components/adapter-configuration/adapter-configuration-state-service/adapter-configuration-state.service.ts
+++
b/ui/src/app/connect/components/adapter-configuration/adapter-configuration-state-service/adapter-configuration-state.service.ts
@@ -31,6 +31,7 @@ import { Observable } from 'rxjs';
import { MatDialog } from '@angular/material/dialog';
import { TranslateService } from '@ngx-translate/core';
import { ConfirmDialogComponent } from '@streampipes/shared-ui';
+import { EventSchemaDiffService } from
'../../../services/event-schema-diff.service';
@Injectable({
providedIn: 'root',
@@ -40,6 +41,7 @@ export class AdapterConfigurationStateService {
private translateService = inject(TranslateService);
private restService = inject(RestService);
private scriptLanguagesService = inject(ConnectScriptLanguagesService);
+ private eventSchemaDiffService = inject(EventSchemaDiffService);
private initialState: AdapterConfigurationState = {
adapterSettingsChanged: false,
@@ -350,6 +352,14 @@ export class AdapterConfigurationStateService {
this.restService.getEventSchema(adapter).subscribe({
next: schema => {
+ const previousEventProperties =
+ this.state().adapterDescription?.dataStream?.eventSchema
+ ?.eventProperties;
+ this.eventSchemaDiffService.applyUserConfiguration(
+ previousEventProperties,
+ schema.eventProperties,
+ );
+
this.sortEventPropertiesAlphabetically(schema);
const updatedAdapter = { ...adapter };
diff --git a/ui/src/app/connect/services/event-schema-diff.service.ts
b/ui/src/app/connect/services/event-schema-diff.service.ts
new file mode 100644
index 0000000000..0802280f21
--- /dev/null
+++ b/ui/src/app/connect/services/event-schema-diff.service.ts
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import { Injectable } from '@angular/core';
+import {
+ EventProperty,
+ EventPropertyList,
+ EventPropertyNested,
+ EventPropertyPrimitive,
+ EventPropertyUnion,
+} from '@streampipes/platform-services';
+
+@Injectable({ providedIn: 'root' })
+export class EventSchemaDiffService {
+ // Copies user-configurable fields from old to new for matching properties.
+ public applyUserConfiguration(
+ oldEventProperties: EventPropertyUnion[] | undefined,
+ newEventProperties: EventPropertyUnion[] | undefined,
+ ): void {
+ const pairs = this.getMatchingRuntimeTypePairs(
+ oldEventProperties,
+ newEventProperties,
+ );
+
+ for (const { oldProperty, newProperty } of pairs) {
+ newProperty.semanticType = oldProperty.semanticType;
+ newProperty.additionalMetadata = oldProperty.additionalMetadata;
+ newProperty.description = oldProperty.description;
+ newProperty.label = oldProperty.label;
+ newProperty.propertyScope = oldProperty.propertyScope;
+
+ if (
+ this.isPrimitiveProperty(newProperty) &&
+ this.isPrimitiveProperty(oldProperty)
+ ) {
+ const newPrimitive = newProperty as EventPropertyPrimitive;
+ const oldPrimitive = oldProperty as EventPropertyPrimitive;
+ newPrimitive.measurementUnit = oldPrimitive.measurementUnit;
+ newPrimitive.runtimeType = oldPrimitive.runtimeType;
+ }
+ }
+ }
+
+ // Returns only matching old/new pairs where runtimeType is unchanged.
+ private getMatchingRuntimeTypePairs(
+ oldEventProperties: EventPropertyUnion[] | undefined,
+ newEventProperties: EventPropertyUnion[] | undefined,
+ ): Array<{
+ oldProperty: EventProperty;
+ newProperty: EventProperty;
+ }> {
+ const diff = this.compareEventSchemas(
+ oldEventProperties,
+ newEventProperties,
+ );
+
+ return diff
+ .filter(entry => {
+ if (!entry.oldProperty || !entry.newProperty) {
+ return false;
+ }
+ const oldDataType = this.getDataType(entry.oldProperty);
+ const newDataType = this.getDataType(entry.newProperty);
+ if (oldDataType !== undefined && newDataType !== undefined) {
+ return oldDataType === newDataType;
+ }
+
+ const oldIsPrimitive = this.isPrimitiveProperty(
+ entry.oldProperty,
+ );
+ const newIsPrimitive = this.isPrimitiveProperty(
+ entry.newProperty,
+ );
+ return !oldIsPrimitive && !newIsPrimitive;
+ })
+ .map(entry => ({
+ oldProperty: entry.oldProperty as EventProperty,
+ newProperty: entry.newProperty as EventProperty,
+ }));
+ }
+
+ // Returns a path-based comparison of old vs. new event properties for
diffing or merge logic.
+ private compareEventSchemas(
+ oldEventProperties: EventPropertyUnion[] | undefined,
+ newEventProperties: EventPropertyUnion[] | undefined,
+ ): Array<{
+ path: string;
+ oldProperty?: EventPropertyUnion;
+ newProperty?: EventPropertyUnion;
+ }> {
+ const oldMap = this.buildEventPropertyPathMap(oldEventProperties);
+ const newMap = this.buildEventPropertyPathMap(newEventProperties);
+
+ const paths = new Set<string>();
+ oldMap.forEach((_value, key) => paths.add(key));
+ newMap.forEach((_value, key) => paths.add(key));
+
+ return Array.from(paths)
+ .sort()
+ .map(path => ({
+ path,
+ oldProperty: oldMap.get(path),
+ newProperty: newMap.get(path),
+ }));
+ }
+
+ private buildEventPropertyPathMap(
+ eventProperties: EventPropertyUnion[] | undefined,
+ basePath: string = '',
+ ): Map<string, EventPropertyUnion> {
+ const map = new Map<string, EventPropertyUnion>();
+
+ if (!eventProperties) {
+ return map;
+ }
+
+ for (const eventProperty of eventProperties) {
+ const path = this.buildEventPropertyPath(
+ basePath,
+ eventProperty?.runtimeName,
+ );
+ if (path) {
+ map.set(path, eventProperty);
+ }
+
+ if (this.isNestedProperty(eventProperty)) {
+ const childMap = this.buildEventPropertyPathMap(
+ eventProperty.eventProperties,
+ path,
+ );
+ childMap.forEach((value, key) => map.set(key, value));
+ } else if (this.isListProperty(eventProperty)) {
+ const listPath = path ? `${path}[]` : '[]';
+ const childMap = this.buildEventPropertyPathMap(
+ [eventProperty.eventProperty],
+ listPath,
+ );
+ childMap.forEach((value, key) => map.set(key, value));
+ }
+ }
+
+ return map;
+ }
+
+ private buildEventPropertyPath(
+ basePath: string,
+ runtimeName?: string,
+ ): string {
+ if (!runtimeName) {
+ return basePath;
+ }
+ return basePath ? `${basePath}.${runtimeName}` : runtimeName;
+ }
+
+ private isNestedProperty(
+ eventProperty: EventPropertyUnion,
+ ): eventProperty is EventPropertyNested {
+ return (
+ eventProperty?.['@class'] ===
+ 'org.apache.streampipes.model.schema.EventPropertyNested'
+ );
+ }
+
+ private isListProperty(
+ eventProperty: EventPropertyUnion,
+ ): eventProperty is EventPropertyList {
+ return (
+ eventProperty?.['@class'] ===
+ 'org.apache.streampipes.model.schema.EventPropertyList'
+ );
+ }
+
+ private isPrimitiveProperty(
+ eventProperty: EventProperty,
+ ): eventProperty is EventPropertyPrimitive {
+ return (
+ eventProperty?.['@class'] ===
+ 'org.apache.streampipes.model.schema.EventPropertyPrimitive'
+ );
+ }
+
+ private getDataType(eventProperty: EventProperty): string | undefined {
+ const originType = eventProperty?.additionalMetadata?.originType;
+ if (originType !== undefined) {
+ return originType;
+ }
+ if (this.isPrimitiveProperty(eventProperty)) {
+ return eventProperty.runtimeType;
+ }
+ return undefined;
+ }
+}