This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch cleanup-pipeline-management
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit f775b2a5bcd81d2ca4f27ea399d1703ed1dd34ec
Author: Dominik Riemer <[email protected]>
AuthorDate: Sun Aug 11 13:58:09 2024 +0200

    chore: Code cleanup of pipeline management
---
 .../exceptions/NoMatchingFormatException.java      |  28 -----
 .../exceptions/NoMatchingJsonSchemaException.java  |  22 ----
 .../exceptions/NoMatchingProtocolException.java    |  23 ----
 .../exceptions/NoMatchingSchemaException.java      |  28 -----
 .../exceptions/NoSepaInPipelineException.java      |  28 -----
 .../exceptions/NoValidConnectionException.java     |  23 ----
 .../exceptions/NoValidSecTypeException.java        |  23 ----
 .../exceptions/NoValidSepTypeException.java        |  23 ----
 .../exceptions/NoValidSepaStructureException.java  |  23 ----
 .../exceptions/NoValidSepaTypeException.java       |  23 ----
 .../RemoteServerNotAccessibleException.java        |  47 --------
 .../commons/exceptions/TooManyEdgesException.java  |  23 ----
 .../management/AdapterUpdateManagement.java        |  10 +-
 .../manager/data/PipelineGraphHelpers.java         |   5 -
 .../manager/execution/PipelineExecutor.java        |   7 +-
 .../manager/extensions/ExtensionItemInstaller.java |   6 +-
 .../manager/matching/ConnectionValidator.java      |  76 ------------
 .../manager/matching/v2/utils/MatchingUtils.java   |   2 +-
 .../migration/AbstractMigrationManager.java        |   4 +-
 .../migration/PipelineElementMigrationManager.java |   4 +-
 .../pipeline/ExtensionsServiceLogExecutor.java     |   6 -
 .../streampipes/manager/operations/Operations.java | 132 ---------------------
 .../manager/pipeline/PipelineManager.java          |  23 +++-
 .../template/PipelineElementTemplateVisitor.java   |   4 -
 .../template/PipelineTemplateGenerator.java        |  64 +---------
 .../PipelineTemplateInvocationHandler.java         |  21 ++--
 .../manager/util/PipelineVerificationUtils.java    |  56 ---------
 .../apache/streampipes/manager/util/TreeUtils.java |  68 -----------
 .../rest/impl/ContainerProvidedOptions.java        |   4 +-
 .../apache/streampipes/rest/impl/DataStream.java   |   4 +-
 .../streampipes/rest/impl/PipelineResource.java    |  27 ++---
 .../streampipes/rest/impl/PipelineTemplate.java    |  18 ++-
 .../streampipes/service/core/PostStartupTask.java  |   4 +-
 .../service/core/StreamPipesCoreApplication.java   |   5 +-
 34 files changed, 70 insertions(+), 794 deletions(-)

diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java
deleted file mode 100644
index 75e024b3d3..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoMatchingFormatException extends Exception {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = -3381149054836186412L;
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java
deleted file mode 100644
index b79ead019b..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoMatchingJsonSchemaException extends Exception {
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java
deleted file mode 100644
index b72747ebea..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoMatchingProtocolException extends Exception {
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java
deleted file mode 100644
index 05c1a961f3..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoMatchingSchemaException extends Exception {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java
deleted file mode 100644
index e844968d3e..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoSepaInPipelineException extends Exception {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java
deleted file mode 100644
index b2fb57c4fb..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidConnectionException extends Exception {
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java
deleted file mode 100644
index 8bffbbf8e0..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidSecTypeException {
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java
deleted file mode 100644
index f20417ee10..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidSepTypeException {
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java
deleted file mode 100644
index 7a9d7c5dd2..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidSepaStructureException {
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java
deleted file mode 100644
index d3712af38c..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidSepaTypeException {
-
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java
deleted file mode 100644
index 93c587f450..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class RemoteServerNotAccessibleException extends Exception {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-
-  private String serverUrl;
-
-  public RemoteServerNotAccessibleException(String message, String serverUrl) {
-    super(message);
-    this.serverUrl = serverUrl;
-  }
-
-  public RemoteServerNotAccessibleException(RemoteServerNotAccessibleException 
e) {
-    super(e.getMessage());
-    this.serverUrl = e.getServerUrl();
-  }
-
-  public String getServerUrl() {
-    return serverUrl;
-  }
-
-  public void setServerUrl(String serverUrl) {
-    this.serverUrl = serverUrl;
-  }
-}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java
deleted file mode 100644
index 5b406f0b1a..0000000000
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class TooManyEdgesException extends Exception {
-
-}
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index 43af954fa2..f3457be595 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -19,8 +19,8 @@
 package org.apache.streampipes.connect.management.management;
 
 import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
-import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.manager.pipeline.PipelineManager;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
@@ -77,12 +77,12 @@ public class AdapterUpdateManagement {
     affectedPipelines.forEach(p -> {
       var shouldRestartPipeline = p.isRunning();
       if (shouldRestartPipeline) {
-        Operations.stopPipeline(p, true);
+        new PipelineExecutor(p).stopPipeline(true);
       }
       var storedPipeline = PipelineManager.getPipeline(p.getPipelineId());
       var pipeline = applyUpdatedDataStream(storedPipeline, ad);
       try {
-        var modificationMessage = Operations.validatePipeline(pipeline);
+        var modificationMessage = new 
PipelineVerificationHandlerV2(pipeline).verifyPipeline();
         var updateInfo = makeUpdateInfo(modificationMessage, pipeline);
         var modifiedPipeline = new 
PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
         var canAutoMigrate = canAutoMigrate(modificationMessage);
@@ -93,7 +93,7 @@ public class AdapterUpdateManagement {
         }
         
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updateElement(modifiedPipeline);
         if (shouldRestartPipeline && canAutoMigrate) {
-          
Operations.startPipeline(PipelineManager.getPipeline(p.getPipelineId()));
+          new 
PipelineExecutor(PipelineManager.getPipeline(pipeline.getPipelineId())).startPipeline();
         }
       } catch (Exception e) {
         LOG.error("Could not update pipeline {}", pipeline.getName(), e);
@@ -113,7 +113,7 @@ public class AdapterUpdateManagement {
     affectedPipelines.forEach(pipeline -> {
       var updatedPipeline = applyUpdatedDataStream(pipeline, 
adapterDescription);
       try {
-        var modificationMessage = Operations.validatePipeline(updatedPipeline);
+        var modificationMessage = new 
PipelineVerificationHandlerV2(updatedPipeline).verifyPipeline();
         var updateInfo = makeUpdateInfo(modificationMessage, updatedPipeline);
         updateInfos.add(updateInfo);
       } catch (Exception e) {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java
index 0291c92030..e5693317c0 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.manager.data;
 
 import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -30,10 +29,6 @@ public class PipelineGraphHelpers {
     return find(pipelineGraph, SpDataStream.class);
   }
 
-  public static List<InvocableStreamPipesEntity> 
findInvocableElements(PipelineGraph pipelineGraph) {
-    return find(pipelineGraph, InvocableStreamPipesEntity.class);
-  }
-
   private static <T> List<T> find(PipelineGraph pipelineGraph, Class<T> clazz) 
{
     return pipelineGraph
         .vertexSet()
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
index bc14e46f58..1371c8b7b3 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
@@ -27,19 +27,16 @@ import java.util.List;
 public class PipelineExecutor {
 
   private final Pipeline pipeline;
-  private final boolean forceStop;
 
-  public PipelineExecutor(Pipeline pipeline,
-                          boolean forceStop) {
+  public PipelineExecutor(Pipeline pipeline) {
     this.pipeline = pipeline;
-    this.forceStop = forceStop;
   }
 
   public PipelineOperationStatus startPipeline() {
     return 
executeOperation(PipelineExecutionTaskFactory.makeStartPipelineTasks(pipeline));
   }
 
-  public PipelineOperationStatus stopPipeline() {
+  public PipelineOperationStatus stopPipeline(boolean forceStop) {
     return 
executeOperation(PipelineExecutionTaskFactory.makeStopPipelineTasks(pipeline, 
forceStop));
   }
 
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
index bfa691964d..16fc95a029 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.manager.extensions;
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import 
org.apache.streampipes.manager.api.extensions.IExtensionsResourceUrlProvider;
 import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import 
org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest;
 import org.apache.streampipes.model.message.Message;
 
@@ -38,13 +38,13 @@ public class ExtensionItemInstaller {
                                   String principalSid) throws IOException, 
SepaParseException {
     var descriptionUrl = getDescriptionUrl(req);
     var description = fetchDescription(descriptionUrl);
-    return Operations.verifyAndAddElement(description, principalSid, 
req.publicElement());
+    return new 
TypeExtractor(description).getTypeVerifier().verifyAndAdd(principalSid, 
req.publicElement());
   }
 
   public Message updateExtension(ExtensionItemInstallationRequest req) throws 
IOException, SepaParseException {
     var descriptionUrl = getDescriptionUrl(req);
     var description = fetchDescription(descriptionUrl);
-    return Operations.verifyAndUpdateElement(description);
+    return new TypeExtractor(description).getTypeVerifier().verifyAndUpdate();
   }
 
   private String getDescriptionUrl(ExtensionItemInstallationRequest req) {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java
deleted file mode 100644
index 4b3ded7949..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.matching;
-
-import org.apache.streampipes.manager.matching.v2.ElementVerification;
-import org.apache.streampipes.manager.util.TreeUtils;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import 
org.apache.streampipes.model.client.exception.InvalidConnectionException;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.pipeline.Pipeline;
-
-import java.util.List;
-
-public class ConnectionValidator {
-
-  private final Pipeline pipeline;
-  private final List<InvocableStreamPipesEntity> invocationGraphs;
-  private final InvocableStreamPipesEntity rootPipelineElement;
-  private final ElementVerification verifier;
-
-  public ConnectionValidator(Pipeline pipeline,
-                             List<InvocableStreamPipesEntity> invocationGraphs,
-                             InvocableStreamPipesEntity rootPipelineElement) {
-    this.pipeline = pipeline;
-    this.invocationGraphs = invocationGraphs;
-    this.rootPipelineElement = rootPipelineElement;
-    this.verifier = new ElementVerification();
-  }
-
-  public List<InvocableStreamPipesEntity> validateConnection() throws 
InvalidConnectionException {
-    boolean verified = true;
-    InvocableStreamPipesEntity rightElement = rootPipelineElement;
-    List<String> connectedTo = rootPipelineElement.getConnectedTo();
-
-    for (String domId : connectedTo) {
-      NamedStreamPipesEntity element = TreeUtils.findSEPAElement(domId, 
pipeline.getSepas(), pipeline.getStreams());
-      if (element instanceof SpDataStream) {
-        SpDataStream leftSpDataStream = (SpDataStream) element;
-        if (!(verifier.verify(leftSpDataStream, rightElement))) {
-          verified = false;
-        }
-      } else {
-        DataProcessorInvocation ancestor = 
findInvocationGraph(invocationGraphs, element.getDom());
-        if (!(verifier.verify(ancestor, rightElement))) {
-          verified = false;
-        }
-      }
-    }
-    if (!verified) {
-      throw new InvalidConnectionException(verifier.getErrorLog());
-    }
-
-    return invocationGraphs;
-  }
-
-  private DataProcessorInvocation 
findInvocationGraph(List<InvocableStreamPipesEntity> graphs, String domId) {
-    return (DataProcessorInvocation) TreeUtils.findByDomId(domId, graphs);
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java
index c0f5c8b432..f20a8e7727 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.manager.matching.v2.utils;
 public class MatchingUtils {
 
   public static boolean nullCheck(Object offer, Object requirement) {
-    return ((offer == null) && (requirement == null)) || (requirement == null);
+    return (requirement == null);
   }
 
   public static boolean nullCheckRightNullDisallowed(Object offer, Object 
requirement) {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
index 5493bbf69d..a27b19b400 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.manager.migration;
 
 import org.apache.streampipes.commons.exceptions.SepaParseException;
 import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
 import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
 import org.apache.streampipes.model.extensions.migration.MigrationRequest;
 import org.apache.streampipes.model.message.Notification;
@@ -139,7 +139,7 @@ public abstract class AbstractMigrationManager {
           .execute()
           .returnContent()
           .asString();
-      var updateResult = Operations.verifyAndUpdateElement(entityPayload);
+      var updateResult = new 
TypeExtractor(entityPayload).getTypeVerifier().verifyAndUpdate();
       if (!updateResult.isSuccess()) {
         LOG.error(
             "Updating the pipeline element description failed: {}",
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index 40ff88acf3..5ac2ba0a58 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -176,8 +176,8 @@ public class PipelineElementMigrationManager extends 
AbstractMigrationManager im
 
 
   public void stopPipeline(Pipeline pipeline) {
-    var pipelineExecutor = new PipelineExecutor(pipeline, true);
-    var pipelineStopResult = pipelineExecutor.stopPipeline();
+    var pipelineExecutor = new PipelineExecutor(pipeline);
+    var pipelineStopResult = pipelineExecutor.stopPipeline(true);
 
     if (pipelineStopResult.isSuccess()) {
       LOG.info("Pipeline successfully stopped.");
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
index 8d76e31252..9f2f430b20 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
@@ -22,12 +22,10 @@ package org.apache.streampipes.manager.monitoring.pipeline;
 import org.apache.streampipes.commons.constants.InstanceIdExtractor;
 import org.apache.streampipes.commons.prometheus.pipelines.PipelineFlowStats;
 import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.model.client.user.Principal;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.monitoring.SpEndpointMonitoringInfo;
-import org.apache.streampipes.resource.management.SpResourceManager;
 import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
 import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes;
@@ -97,10 +95,6 @@ public class ExtensionsServiceLogExecutor implements 
Runnable {
     return 
ExtensionServiceExecutions.extServiceGetRequest(makeLogUrl(serviceEndpointUrl));
   }
 
-  private Principal getServiceAdmin() {
-    return new SpResourceManager().manageUsers().getServiceAdmin();
-  }
-
   private List<String> getActiveExtensionsEndpoints() {
     return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(
         DefaultSpServiceTypes.EXT,
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
deleted file mode 100644
index f2addd2b20..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.operations;
-
-import 
org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
-import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.manager.execution.PipelineExecutor;
-import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
-import org.apache.streampipes.manager.recommender.ElementRecommender;
-import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
-import org.apache.streampipes.manager.storage.PipelineStorageService;
-import org.apache.streampipes.manager.template.PipelineTemplateGenerator;
-import 
org.apache.streampipes.manager.template.PipelineTemplateInvocationGenerator;
-import 
org.apache.streampipes.manager.template.PipelineTemplateInvocationHandler;
-import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
-import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.message.Message;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import 
org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
-import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
-import org.apache.streampipes.model.template.PipelineTemplateDescription;
-import org.apache.streampipes.model.template.PipelineTemplateInvocation;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * class that provides several (partial) pipeline verification methods
- */
-
-public class Operations {
-
-  /**
-   * @param pipeline the pipeline to validate
-   * @return PipelineModificationMessage a message containing desired pipeline 
modifications
-   */
-  public static PipelineModificationMessage validatePipeline(Pipeline 
pipeline) throws Exception {
-    return new PipelineVerificationHandlerV2(pipeline).verifyPipeline();
-  }
-
-  public static Message verifyAndAddElement(String graphData,
-                                            String principalSid,
-                                            boolean publicElement) throws 
SepaParseException {
-    return new 
TypeExtractor(graphData).getTypeVerifier().verifyAndAdd(principalSid, 
publicElement);
-  }
-
-  public static Message verifyAndUpdateElement(String graphData) throws 
SepaParseException {
-    return new TypeExtractor(graphData).getTypeVerifier().verifyAndUpdate();
-  }
-
-  public static PipelineElementRecommendationMessage 
findRecommendedElements(Pipeline partialPipeline,
-                                                                             
String baseRecId)
-      throws NoSuitableSepasAvailableException {
-    return new ElementRecommender(partialPipeline, 
baseRecId).findRecommendedElements();
-  }
-
-  public static void storePipeline(Pipeline pipeline) {
-    new PipelineStorageService(pipeline).addPipeline();
-  }
-
-  public static void updatePipeline(Pipeline pipeline) {
-    new PipelineStorageService(pipeline).updatePipeline();
-  }
-
-  public static PipelineOperationStatus startPipeline(Pipeline pipeline) {
-    return new PipelineExecutor(pipeline, false).startPipeline();
-  }
-
-  public static List<PipelineOperationStatus> stopAllPipelines(boolean 
forceStop) {
-    List<PipelineOperationStatus> status = new ArrayList<>();
-    List<Pipeline> pipelines =
-        
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
-
-    pipelines.forEach(p -> {
-      if (p.isRunning()) {
-        status.add(Operations.stopPipeline(p, forceStop));
-      }
-    });
-    return status;
-  }
-
-  public static PipelineOperationStatus stopPipeline(Pipeline pipeline,
-                                                     boolean forceStop) {
-    return new PipelineExecutor(pipeline, forceStop).stopPipeline();
-  }
-
-  public static SpDataStream updateActualTopic(SpDataStream stream) {
-    return new WildcardTopicGenerator(stream).computeActualTopic();
-  }
-
-  public static RuntimeOptionsResponse 
fetchRemoteOptions(RuntimeOptionsRequest request) {
-    return new ContainerProvidedOptionsHandler().fetchRemoteOptions(request);
-  }
-
-  public static List<PipelineTemplateDescription> getAllPipelineTemplates() {
-    return new PipelineTemplateGenerator().getAllPipelineTemplates();
-  }
-
-  public static PipelineOperationStatus handlePipelineTemplateInvocation(
-      String userSid,
-      PipelineTemplateInvocation pipelineTemplateInvocation) {
-    return new PipelineTemplateInvocationHandler(userSid, 
pipelineTemplateInvocation).handlePipelineInvocation();
-  }
-
-  public static PipelineTemplateInvocation getPipelineInvocationTemplate(
-      SpDataStream dataStream,
-      PipelineTemplateDescription pipelineTemplateDescription) {
-    return new PipelineTemplateInvocationGenerator(dataStream, 
pipelineTemplateDescription).generateInvocation();
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
index e57675ecba..ca477d73db 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
@@ -19,8 +19,9 @@
 package org.apache.streampipes.manager.pipeline;
 
 import org.apache.streampipes.commons.random.UUIDGenerator;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.manager.permission.PermissionManager;
+import org.apache.streampipes.manager.storage.PipelineStorageService;
 import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 import org.apache.streampipes.model.client.user.Permission;
 import org.apache.streampipes.model.pipeline.Pipeline;
@@ -30,6 +31,7 @@ import org.apache.streampipes.storage.api.IPermissionStorage;
 import org.apache.streampipes.storage.api.IPipelineStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Objects;
@@ -71,7 +73,7 @@ public class PipelineManager {
         ? UUIDGenerator.generateUuid()
         : pipeline.getPipelineId();
     preparePipelineBasics(principalSid, pipeline, pipelineId);
-    Operations.storePipeline(pipeline);
+    new PipelineStorageService(pipeline).addPipeline();
 
     Permission permission = new PermissionManager().makePermission(pipeline, 
principalSid);
     getPermissionStorage().persist(permission);
@@ -88,7 +90,7 @@ public class PipelineManager {
    */
   public static PipelineOperationStatus startPipeline(String pipelineId) {
     Pipeline pipeline = getPipeline(pipelineId);
-    return Operations.startPipeline(pipeline);
+    return new PipelineExecutor(pipeline).startPipeline();
   }
 
   /**
@@ -103,7 +105,7 @@ public class PipelineManager {
                                                      boolean forceStop) {
     Pipeline pipeline = getPipeline(pipelineId);
 
-    return Operations.stopPipeline(pipeline, forceStop);
+    return new PipelineExecutor(pipeline).stopPipeline(forceStop);
   }
 
   /**
@@ -119,6 +121,19 @@ public class PipelineManager {
     }
   }
 
+  public static List<PipelineOperationStatus> stopAllPipelines(boolean 
forceStop) {
+    List<PipelineOperationStatus> status = new ArrayList<>();
+    List<Pipeline> pipelines =
+        
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
+
+    pipelines.forEach(p -> {
+      if (p.isRunning()) {
+        status.add(new PipelineExecutor(p).stopPipeline(forceStop));
+      }
+    });
+    return status;
+  }
+
 
   /**
    * Checks for the pipelines that contain the processing element
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
index b301a5243a..a32bd5d6d3 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
@@ -202,10 +202,6 @@ public class PipelineElementTemplateVisitor implements 
StaticPropertyVisitor {
     // TODO not yet supported
   }
 
-  private Object getValue(StaticProperty sp) {
-    return ((Map<String, Object>) 
configs.get(sp.getInternalName())).get("value");
-  }
-
   private List<String> getValueAsList(StaticProperty sp) {
     return (List<String>) configs.get(sp.getInternalName());
   }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java
index 4531fc88f2..86c46bc742 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java
@@ -18,15 +18,9 @@
 package org.apache.streampipes.manager.template;
 
 import org.apache.streampipes.commons.exceptions.ElementNotFoundException;
-import org.apache.streampipes.manager.matching.v2.ElementVerification;
 import 
org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate;
 import org.apache.streampipes.manager.template.instances.PipelineTemplate;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.template.PipelineTemplateDescription;
 import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
@@ -43,7 +37,7 @@ public class PipelineTemplateGenerator {
 
   Logger logger = LoggerFactory.getLogger(PipelineTemplateGenerator.class);
 
-  private List<PipelineTemplateDescription> availableDescriptions = new 
ArrayList<>();
+  private final List<PipelineTemplateDescription> availableDescriptions = new 
ArrayList<>();
 
   public List<PipelineTemplateDescription> getAllPipelineTemplates() {
 
@@ -65,62 +59,6 @@ public class PipelineTemplateGenerator {
     return availableDescriptions;
   }
 
-  public List<PipelineTemplateDescription> getCompatibleTemplates(String 
streamId) {
-    List<PipelineTemplateDescription> compatibleTemplates = new ArrayList<>();
-    ElementVerification verifier = new ElementVerification();
-    SpDataStream streamOffer = null;
-
-    try {
-      streamOffer = getStream(streamId);
-      streamOffer = new SpDataStream(streamOffer);
-      if (streamOffer != null) {
-        for (PipelineTemplateDescription pipelineTemplateDescription : 
getAllPipelineTemplates()) {
-          // TODO make this work for 2+ input streams
-          InvocableStreamPipesEntity entity =
-              
cloneInvocation(pipelineTemplateDescription.getBoundTo().get(0).getPipelineElementTemplate());
-          if (verifier.verify(streamOffer, entity)) {
-            compatibleTemplates.add(pipelineTemplateDescription);
-          }
-        }
-      }
-
-    } catch (ElementNotFoundException e) {
-      e.printStackTrace();
-    }
-
-    return compatibleTemplates;
-  }
-
-  private InvocableStreamPipesEntity 
cloneInvocation(InvocableStreamPipesEntity pipelineElementTemplate) {
-    if (pipelineElementTemplate instanceof DataProcessorInvocation) {
-      return new DataProcessorInvocation((DataProcessorInvocation) 
pipelineElementTemplate);
-    } else {
-      return new DataSinkInvocation((DataSinkInvocation) 
pipelineElementTemplate);
-    }
-  }
-
-  protected SpDataStream getStream(String streamId) throws 
ElementNotFoundException {
-    SpDataStream result = getStorage()
-        .getEventStreamById(streamId);
-
-    if (result == null) {
-      throw new ElementNotFoundException("Data stream " + streamId + " is not 
installed!");
-    }
-
-    return result;
-  }
-
-  protected DataProcessorDescription getProcessor(String id) throws 
ElementNotFoundException {
-    DataProcessorDescription result = getStorage()
-        .getDataProcessorByAppId(id);
-
-    if (result == null) {
-      throw new ElementNotFoundException("Data processor " + id + " is not 
installed!");
-    }
-
-    return result;
-  }
-
   protected DataSinkDescription getSink(String id) throws 
ElementNotFoundException {
     try {
       return getStorage()
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java
index 0848abdac9..7b1748a9ef 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java
@@ -17,8 +17,9 @@
  */
 package org.apache.streampipes.manager.template;
 
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.manager.permission.PermissionManager;
+import org.apache.streampipes.manager.storage.PipelineStorageService;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.client.user.Permission;
 import org.apache.streampipes.model.pipeline.Pipeline;
@@ -33,9 +34,9 @@ import java.util.List;
 
 public class PipelineTemplateInvocationHandler {
 
-  private PipelineTemplateInvocation pipelineTemplateInvocation;
-  private PipelineTemplateDescription pipelineTemplateDescription;
-  private String username;
+  private final PipelineTemplateInvocation pipelineTemplateInvocation;
+  private final PipelineTemplateDescription pipelineTemplateDescription;
+  private final String username;
 
   public PipelineTemplateInvocationHandler(String username, 
PipelineTemplateInvocation pipelineTemplateInvocation) {
     this.username = username;
@@ -43,26 +44,18 @@ public class PipelineTemplateInvocationHandler {
     this.pipelineTemplateDescription = 
getTemplateById(pipelineTemplateInvocation.getPipelineTemplateId());
   }
 
-  public PipelineTemplateInvocationHandler(String username, 
PipelineTemplateInvocation pipelineTemplateInvocation,
-                                           PipelineTemplateDescription 
pipelineTemplateDescription) {
-    this.username = username;
-    this.pipelineTemplateInvocation = pipelineTemplateInvocation;
-    this.pipelineTemplateDescription = pipelineTemplateDescription;
-  }
-
-
   public PipelineOperationStatus handlePipelineInvocation() {
     Pipeline pipeline = new 
PipelineGenerator(pipelineTemplateInvocation.getDataStreamId(), 
pipelineTemplateDescription,
         pipelineTemplateInvocation.getKviName()).makePipeline();
     pipeline.setCreatedByUser(username);
     pipeline.setCreatedAt(System.currentTimeMillis());
     replaceStaticProperties(pipeline);
-    Operations.storePipeline(pipeline);
+    new PipelineStorageService(pipeline).addPipeline();
     Permission permission = new PermissionManager().makePermission(pipeline, 
username);
     
StorageDispatcher.INSTANCE.getNoSqlStore().getPermissionStorage().persist(permission);
     Pipeline storedPipeline =
         
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getElementById(pipeline.getPipelineId());
-    return Operations.startPipeline(storedPipeline);
+    return new PipelineExecutor(storedPipeline).startPipeline();
   }
 
   private void replaceStaticProperties(Pipeline pipeline) {
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java
deleted file mode 100644
index 404c34f5e4..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.util;
-
-import org.apache.streampipes.commons.exceptions.NoSepaInPipelineException;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class PipelineVerificationUtils {
-
-  /**
-   * returns the root node of a partial pipeline (a pipeline without an action)
-   *
-   * @param pipeline
-   * @return {@link 
org.apache.streampipes.model.base.InvocableStreamPipesEntity}
-   */
-
-  public static InvocableStreamPipesEntity getRootNode(Pipeline pipeline) 
throws NoSepaInPipelineException {
-    List<InvocableStreamPipesEntity> elements = new ArrayList<>();
-    elements.addAll(pipeline.getSepas());
-    elements.addAll(pipeline.getActions());
-
-    List<InvocableStreamPipesEntity> unconfiguredElements = elements
-        .stream()
-        .filter(e -> !e.isConfigured())
-        .collect(Collectors.toList());
-
-
-    if (unconfiguredElements.size() != 1) {
-      throw new NoSepaInPipelineException();
-    } else {
-      return unconfiguredElements.get(0);
-    }
-
-  }
-}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java
deleted file mode 100644
index 4986bba5a2..0000000000
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.util;
-
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TreeUtils {
-
-  /**
-   * @param id      the DOM ID
-   * @param sepas   list of sepas in model-client format
-   * @param streams list of streams in model-client format
-   * @return a SEPA-client element
-   */
-
-  public static NamedStreamPipesEntity findSEPAElement(String id, 
List<DataProcessorInvocation> sepas,
-                                                       List<SpDataStream>
-                                                           streams) {
-    List<NamedStreamPipesEntity> allElements = new ArrayList<>();
-    allElements.addAll(sepas);
-    allElements.addAll(streams);
-
-    for (NamedStreamPipesEntity element : allElements) {
-      if (id.equals(element.getDom())) {
-        return element;
-      }
-    }
-    //TODO
-    return null;
-  }
-
-  /**
-   * @param id     the DOM ID
-   * @param graphs list of invocation graphs
-   * @return an invocation graph with a given DOM Id
-   */
-  public static InvocableStreamPipesEntity findByDomId(String id, 
List<InvocableStreamPipesEntity> graphs) {
-    for (InvocableStreamPipesEntity graph : graphs) {
-      if (graph.getDom().equals(id)) {
-        return graph;
-      }
-    }
-    //TODO
-    return null;
-  }
-}
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
index cce62bc080..1d41f47cc4 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.rest.impl;
 
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
 import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
 import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
 import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
@@ -38,6 +38,6 @@ public class ContainerProvidedOptions extends 
AbstractRestResource {
       consumes = MediaType.APPLICATION_JSON_VALUE
   )
   public ResponseEntity<RuntimeOptionsResponse> 
fetchRemoteOptions(@RequestBody RuntimeOptionsRequest request) {
-    return ok(Operations.fetchRemoteOptions(request));
+    return ok(new 
ContainerProvidedOptionsHandler().fetchRemoteOptions(request));
   }
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java
index db72f9085f..be98a833ab 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java
@@ -17,7 +17,7 @@
  */
 package org.apache.streampipes.rest.impl;
 
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
 
@@ -34,6 +34,6 @@ public class DataStream extends AbstractRestResource {
 
   @PostMapping(path = "/update", produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<SpDataStream> getStreamsBySource(@RequestBody 
SpDataStream stream) {
-    return ok(Operations.updateActualTopic(stream));
+    return ok(new WildcardTopicGenerator(stream).computeActualTopic());
   }
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index 75e636cd6b..65339b9020 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -18,16 +18,12 @@
 
 package org.apache.streampipes.rest.impl;
 
-import org.apache.streampipes.commons.exceptions.NoMatchingFormatException;
-import org.apache.streampipes.commons.exceptions.NoMatchingJsonSchemaException;
-import org.apache.streampipes.commons.exceptions.NoMatchingProtocolException;
-import org.apache.streampipes.commons.exceptions.NoMatchingSchemaException;
 import 
org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
-import 
org.apache.streampipes.commons.exceptions.RemoteServerNotAccessibleException;
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
 import org.apache.streampipes.manager.pipeline.PipelineManager;
-import 
org.apache.streampipes.model.client.exception.InvalidConnectionException;
+import org.apache.streampipes.manager.recommender.ElementRecommender;
+import org.apache.streampipes.manager.storage.PipelineStorageService;
 import org.apache.streampipes.model.message.ErrorMessage;
 import org.apache.streampipes.model.message.Message;
 import org.apache.streampipes.model.message.Notification;
@@ -52,6 +48,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
 import io.swagger.v3.oas.annotations.responses.ApiResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
@@ -178,7 +175,7 @@ public class PipelineResource extends 
AbstractAuthGuardedRestResource {
   public PipelineElementRecommendationMessage recommend(@RequestBody Pipeline 
pipeline,
                                                         @PathVariable("recId") 
String baseRecElement) {
     try {
-      return Operations.findRecommendedElements(pipeline, baseRecElement);
+      return new ElementRecommender(pipeline, 
baseRecElement).findRecommendedElements();
     } catch (JsonSyntaxException e) {
       throw new SpNotificationException(
           HttpStatus.BAD_REQUEST,
@@ -207,19 +204,9 @@ public class PipelineResource extends 
AbstractAuthGuardedRestResource {
   @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
   public ResponseEntity<?> validatePipeline(@RequestBody Pipeline pipeline) {
     try {
-      return ok(Operations.validatePipeline(pipeline));
+      return ok(new PipelineVerificationHandlerV2(pipeline).verifyPipeline());
     } catch (JsonSyntaxException e) {
       return badRequest(new Notification(NotificationType.UNKNOWN_ERROR, 
e.getMessage()));
-    } catch (NoMatchingSchemaException e) {
-      return badRequest(new Notification(NotificationType.NO_VALID_CONNECTION, 
e.getMessage()));
-    } catch (NoMatchingFormatException e) {
-      return badRequest(new 
Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION, e.getMessage()));
-    } catch (NoMatchingProtocolException e) {
-      return badRequest(new 
Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION, e.getMessage()));
-    } catch (RemoteServerNotAccessibleException | 
NoMatchingJsonSchemaException e) {
-      return serverError(new 
Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE, e.getMessage()));
-    } catch (InvalidConnectionException e) {
-      return badRequest(e.getErrorLog());
     } catch (Exception e) {
       LOG.error(e.getMessage());
       return serverError(new Notification(NotificationType.UNKNOWN_ERROR, 
e.getMessage()));
@@ -244,7 +231,7 @@ public class PipelineResource extends 
AbstractAuthGuardedRestResource {
     storedPipeline.setHealthStatus(pipeline.getHealthStatus());
     
storedPipeline.setPipelineNotifications(pipeline.getPipelineNotifications());
     storedPipeline.setValid(pipeline.isValid());
-    Operations.updatePipeline(storedPipeline);
+    new PipelineStorageService(pipeline).updatePipeline();
     SuccessMessage message = Notifications.success("Pipeline modified");
     message.addNotification(new Notification("id", pipelineId));
     return ok(message);
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
index f3496c1213..5957fe01e2 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
@@ -17,7 +17,9 @@
  */
 package org.apache.streampipes.rest.impl;
 
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.template.PipelineTemplateGenerator;
+import 
org.apache.streampipes.manager.template.PipelineTemplateInvocationGenerator;
+import 
org.apache.streampipes.manager.template.PipelineTemplateInvocationHandler;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.SpDataStreamContainer;
 import org.apache.streampipes.model.message.Notifications;
@@ -67,7 +69,10 @@ public class PipelineTemplate extends 
AbstractAuthGuardedRestResource {
     var pipelineTemplateDescriptionOpt = 
getPipelineTemplateDescription(pipelineTemplateId);
     if (pipelineTemplateDescriptionOpt.isPresent()) {
       PipelineTemplateInvocation invocation =
-          Operations.getPipelineInvocationTemplate(dataStream, 
pipelineTemplateDescriptionOpt.get());
+          new PipelineTemplateInvocationGenerator(
+              dataStream,
+              pipelineTemplateDescriptionOpt.get()
+          ).generateInvocation();
       PipelineTemplateInvocation clonedInvocation = new 
PipelineTemplateInvocation(invocation);
       return ok(new PipelineTemplateInvocation(clonedInvocation));
     } else {
@@ -85,14 +90,15 @@ public class PipelineTemplate extends 
AbstractAuthGuardedRestResource {
   public ResponseEntity<PipelineOperationStatus> generatePipeline(
       @RequestBody PipelineTemplateInvocation pipelineTemplateInvocation) {
 
-    PipelineOperationStatus status = Operations
-        .handlePipelineTemplateInvocation(getAuthenticatedUserSid(), 
pipelineTemplateInvocation);
-
+    PipelineOperationStatus status = new PipelineTemplateInvocationHandler(
+        getAuthenticatedUserSid(),
+        pipelineTemplateInvocation
+    ).handlePipelineInvocation();
     return ok(status);
   }
 
   private Optional<PipelineTemplateDescription> 
getPipelineTemplateDescription(String pipelineTemplateId) {
-    return Operations
+    return new PipelineTemplateGenerator()
         .getAllPipelineTemplates()
         .stream()
         .filter(pt -> pt.getAppId().equals(pipelineTemplateId))
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
index f7f0c32d22..06c7257bf0 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
@@ -20,8 +20,8 @@ package org.apache.streampipes.service.core;
 
 import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
 import 
org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
 import org.apache.streampipes.manager.health.ServiceHealthCheck;
-import org.apache.streampipes.manager.operations.Operations;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
@@ -120,7 +120,7 @@ public class PostStartupTask implements Runnable {
   }
 
   private void startPipeline(Pipeline pipeline, boolean restartOnReboot) {
-    PipelineOperationStatus status = Operations.startPipeline(pipeline);
+    PipelineOperationStatus status = new 
PipelineExecutor(pipeline).startPipeline();
     if (status.isSuccess()) {
       LOG.info("Pipeline {} successfully restarted", status.getPipelineName());
       Pipeline storedPipeline = 
getPipelineStorage().getElementById(pipeline.getPipelineId());
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index f9c062b546..9ba8605c2a 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -26,7 +26,7 @@ import 
org.apache.streampipes.manager.health.CoreServiceStatusManager;
 import org.apache.streampipes.manager.health.PipelineHealthCheck;
 import org.apache.streampipes.manager.health.ServiceHealthCheck;
 import 
org.apache.streampipes.manager.monitoring.pipeline.ExtensionsServiceLogExecutor;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.pipeline.PipelineManager;
 import org.apache.streampipes.manager.setup.AutoInstallation;
 import org.apache.streampipes.manager.setup.StreamPipesEnvChecker;
 import org.apache.streampipes.messaging.SpProtocolManager;
@@ -51,6 +51,7 @@ import 
org.apache.streampipes.storage.management.StorageDispatcher;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
@@ -221,7 +222,7 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
     });
 
     LOG.info("Gracefully stopping all running pipelines...");
-    List<PipelineOperationStatus> status = Operations.stopAllPipelines(true);
+    List<PipelineOperationStatus> status = 
PipelineManager.stopAllPipelines(true);
     status.forEach(s -> {
       if (s.isSuccess()) {
         LOG.info("Pipeline {} successfully stopped", s.getPipelineName());

Reply via email to