This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 3ac47b9813 chore: Code cleanup of pipeline management (#3126)
3ac47b9813 is described below
commit 3ac47b9813db3d825812d27720c8a3ba78c872e7
Author: Dominik Riemer <[email protected]>
AuthorDate: Sun Aug 11 21:53:25 2024 +0200
chore: Code cleanup of pipeline management (#3126)
* chore: Code cleanup of pipeline management
* Fix checkstyle
* Fix checkstyle
* Fix pipeline update
* Fix bug
---
.../exceptions/NoMatchingFormatException.java | 28 -----
.../exceptions/NoMatchingJsonSchemaException.java | 22 ----
.../exceptions/NoMatchingProtocolException.java | 23 ----
.../exceptions/NoMatchingSchemaException.java | 28 -----
.../exceptions/NoSepaInPipelineException.java | 28 -----
.../exceptions/NoValidConnectionException.java | 23 ----
.../exceptions/NoValidSecTypeException.java | 23 ----
.../exceptions/NoValidSepTypeException.java | 23 ----
.../exceptions/NoValidSepaStructureException.java | 23 ----
.../exceptions/NoValidSepaTypeException.java | 23 ----
.../RemoteServerNotAccessibleException.java | 47 --------
.../commons/exceptions/TooManyEdgesException.java | 23 ----
.../management/AdapterUpdateManagement.java | 10 +-
.../manager/data/PipelineGraphHelpers.java | 5 -
.../manager/execution/PipelineExecutor.java | 7 +-
.../manager/extensions/ExtensionItemInstaller.java | 6 +-
.../manager/matching/ConnectionValidator.java | 76 ------------
.../manager/matching/v2/utils/MatchingUtils.java | 2 +-
.../migration/AbstractMigrationManager.java | 4 +-
.../migration/PipelineElementMigrationManager.java | 4 +-
.../pipeline/ExtensionsServiceLogExecutor.java | 6 -
.../streampipes/manager/operations/Operations.java | 132 ---------------------
.../manager/pipeline/PipelineManager.java | 23 +++-
.../template/PipelineElementTemplateVisitor.java | 4 -
.../template/PipelineTemplateGenerator.java | 64 +---------
.../PipelineTemplateInvocationHandler.java | 21 ++--
.../manager/util/PipelineVerificationUtils.java | 56 ---------
.../apache/streampipes/manager/util/TreeUtils.java | 68 -----------
.../rest/impl/ContainerProvidedOptions.java | 4 +-
.../apache/streampipes/rest/impl/DataStream.java | 4 +-
.../streampipes/rest/impl/PipelineResource.java | 26 +---
.../streampipes/rest/impl/PipelineTemplate.java | 18 ++-
.../streampipes/service/core/PostStartupTask.java | 4 +-
.../service/core/StreamPipesCoreApplication.java | 4 +-
34 files changed, 68 insertions(+), 794 deletions(-)
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java
deleted file mode 100644
index 75e024b3d3..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingFormatException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoMatchingFormatException extends Exception {
-
- /**
- *
- */
- private static final long serialVersionUID = -3381149054836186412L;
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java
deleted file mode 100644
index b79ead019b..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingJsonSchemaException.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoMatchingJsonSchemaException extends Exception {
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java
deleted file mode 100644
index b72747ebea..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingProtocolException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoMatchingProtocolException extends Exception {
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java
deleted file mode 100644
index 05c1a961f3..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoMatchingSchemaException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoMatchingSchemaException extends Exception {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java
deleted file mode 100644
index e844968d3e..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoSepaInPipelineException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoSepaInPipelineException extends Exception {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java
deleted file mode 100644
index b2fb57c4fb..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidConnectionException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidConnectionException extends Exception {
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java
deleted file mode 100644
index 8bffbbf8e0..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSecTypeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidSecTypeException {
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java
deleted file mode 100644
index f20417ee10..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepTypeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidSepTypeException {
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java
deleted file mode 100644
index 7a9d7c5dd2..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaStructureException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidSepaStructureException {
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java
deleted file mode 100644
index d3712af38c..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/NoValidSepaTypeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class NoValidSepaTypeException {
-
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java
deleted file mode 100644
index 93c587f450..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/RemoteServerNotAccessibleException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class RemoteServerNotAccessibleException extends Exception {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- private String serverUrl;
-
- public RemoteServerNotAccessibleException(String message, String serverUrl) {
- super(message);
- this.serverUrl = serverUrl;
- }
-
- public RemoteServerNotAccessibleException(RemoteServerNotAccessibleException
e) {
- super(e.getMessage());
- this.serverUrl = e.getServerUrl();
- }
-
- public String getServerUrl() {
- return serverUrl;
- }
-
- public void setServerUrl(String serverUrl) {
- this.serverUrl = serverUrl;
- }
-}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java
deleted file mode 100644
index 5b406f0b1a..0000000000
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/TooManyEdgesException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.commons.exceptions;
-
-public class TooManyEdgesException extends Exception {
-
-}
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index 43af954fa2..9aae62888d 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -19,8 +19,8 @@
package org.apache.streampipes.connect.management.management;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
-import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.manager.pipeline.PipelineManager;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
@@ -77,12 +77,12 @@ public class AdapterUpdateManagement {
affectedPipelines.forEach(p -> {
var shouldRestartPipeline = p.isRunning();
if (shouldRestartPipeline) {
- Operations.stopPipeline(p, true);
+ new PipelineExecutor(p).stopPipeline(true);
}
var storedPipeline = PipelineManager.getPipeline(p.getPipelineId());
var pipeline = applyUpdatedDataStream(storedPipeline, ad);
try {
- var modificationMessage = Operations.validatePipeline(pipeline);
+ var modificationMessage = new
PipelineVerificationHandlerV2(pipeline).verifyPipeline();
var updateInfo = makeUpdateInfo(modificationMessage, pipeline);
var modifiedPipeline = new
PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
var canAutoMigrate = canAutoMigrate(modificationMessage);
@@ -93,7 +93,7 @@ public class AdapterUpdateManagement {
}
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updateElement(modifiedPipeline);
if (shouldRestartPipeline && canAutoMigrate) {
-
Operations.startPipeline(PipelineManager.getPipeline(p.getPipelineId()));
+ new
PipelineExecutor(PipelineManager.getPipeline(p.getPipelineId())).startPipeline();
}
} catch (Exception e) {
LOG.error("Could not update pipeline {}", pipeline.getName(), e);
@@ -113,7 +113,7 @@ public class AdapterUpdateManagement {
affectedPipelines.forEach(pipeline -> {
var updatedPipeline = applyUpdatedDataStream(pipeline,
adapterDescription);
try {
- var modificationMessage = Operations.validatePipeline(updatedPipeline);
+ var modificationMessage = new
PipelineVerificationHandlerV2(updatedPipeline).verifyPipeline();
var updateInfo = makeUpdateInfo(modificationMessage, updatedPipeline);
updateInfos.add(updateInfo);
} catch (Exception e) {
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java
index 0291c92030..e5693317c0 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphHelpers.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.manager.data;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import java.util.List;
import java.util.stream.Collectors;
@@ -30,10 +29,6 @@ public class PipelineGraphHelpers {
return find(pipelineGraph, SpDataStream.class);
}
- public static List<InvocableStreamPipesEntity>
findInvocableElements(PipelineGraph pipelineGraph) {
- return find(pipelineGraph, InvocableStreamPipesEntity.class);
- }
-
private static <T> List<T> find(PipelineGraph pipelineGraph, Class<T> clazz)
{
return pipelineGraph
.vertexSet()
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
index bc14e46f58..1371c8b7b3 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/PipelineExecutor.java
@@ -27,19 +27,16 @@ import java.util.List;
public class PipelineExecutor {
private final Pipeline pipeline;
- private final boolean forceStop;
- public PipelineExecutor(Pipeline pipeline,
- boolean forceStop) {
+ public PipelineExecutor(Pipeline pipeline) {
this.pipeline = pipeline;
- this.forceStop = forceStop;
}
public PipelineOperationStatus startPipeline() {
return
executeOperation(PipelineExecutionTaskFactory.makeStartPipelineTasks(pipeline));
}
- public PipelineOperationStatus stopPipeline() {
+ public PipelineOperationStatus stopPipeline(boolean forceStop) {
return
executeOperation(PipelineExecutionTaskFactory.makeStopPipelineTasks(pipeline,
forceStop));
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
index bfa691964d..16fc95a029 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/extensions/ExtensionItemInstaller.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.manager.extensions;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import
org.apache.streampipes.manager.api.extensions.IExtensionsResourceUrlProvider;
import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
import
org.apache.streampipes.model.extensions.ExtensionItemInstallationRequest;
import org.apache.streampipes.model.message.Message;
@@ -38,13 +38,13 @@ public class ExtensionItemInstaller {
String principalSid) throws IOException,
SepaParseException {
var descriptionUrl = getDescriptionUrl(req);
var description = fetchDescription(descriptionUrl);
- return Operations.verifyAndAddElement(description, principalSid,
req.publicElement());
+ return new
TypeExtractor(description).getTypeVerifier().verifyAndAdd(principalSid,
req.publicElement());
}
public Message updateExtension(ExtensionItemInstallationRequest req) throws
IOException, SepaParseException {
var descriptionUrl = getDescriptionUrl(req);
var description = fetchDescription(descriptionUrl);
- return Operations.verifyAndUpdateElement(description);
+ return new TypeExtractor(description).getTypeVerifier().verifyAndUpdate();
}
private String getDescriptionUrl(ExtensionItemInstallationRequest req) {
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java
deleted file mode 100644
index 4b3ded7949..0000000000
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ConnectionValidator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.matching;
-
-import org.apache.streampipes.manager.matching.v2.ElementVerification;
-import org.apache.streampipes.manager.util.TreeUtils;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import
org.apache.streampipes.model.client.exception.InvalidConnectionException;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.pipeline.Pipeline;
-
-import java.util.List;
-
-public class ConnectionValidator {
-
- private final Pipeline pipeline;
- private final List<InvocableStreamPipesEntity> invocationGraphs;
- private final InvocableStreamPipesEntity rootPipelineElement;
- private final ElementVerification verifier;
-
- public ConnectionValidator(Pipeline pipeline,
- List<InvocableStreamPipesEntity> invocationGraphs,
- InvocableStreamPipesEntity rootPipelineElement) {
- this.pipeline = pipeline;
- this.invocationGraphs = invocationGraphs;
- this.rootPipelineElement = rootPipelineElement;
- this.verifier = new ElementVerification();
- }
-
- public List<InvocableStreamPipesEntity> validateConnection() throws
InvalidConnectionException {
- boolean verified = true;
- InvocableStreamPipesEntity rightElement = rootPipelineElement;
- List<String> connectedTo = rootPipelineElement.getConnectedTo();
-
- for (String domId : connectedTo) {
- NamedStreamPipesEntity element = TreeUtils.findSEPAElement(domId,
pipeline.getSepas(), pipeline.getStreams());
- if (element instanceof SpDataStream) {
- SpDataStream leftSpDataStream = (SpDataStream) element;
- if (!(verifier.verify(leftSpDataStream, rightElement))) {
- verified = false;
- }
- } else {
- DataProcessorInvocation ancestor =
findInvocationGraph(invocationGraphs, element.getDom());
- if (!(verifier.verify(ancestor, rightElement))) {
- verified = false;
- }
- }
- }
- if (!verified) {
- throw new InvalidConnectionException(verifier.getErrorLog());
- }
-
- return invocationGraphs;
- }
-
- private DataProcessorInvocation
findInvocationGraph(List<InvocableStreamPipesEntity> graphs, String domId) {
- return (DataProcessorInvocation) TreeUtils.findByDomId(domId, graphs);
- }
-}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java
index c0f5c8b432..f20a8e7727 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/utils/MatchingUtils.java
@@ -21,7 +21,7 @@ package org.apache.streampipes.manager.matching.v2.utils;
public class MatchingUtils {
public static boolean nullCheck(Object offer, Object requirement) {
- return ((offer == null) && (requirement == null)) || (requirement == null);
+ return (requirement == null);
}
public static boolean nullCheckRightNullDisallowed(Object offer, Object
requirement) {
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
index 5493bbf69d..a27b19b400 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.manager.migration;
import org.apache.streampipes.commons.exceptions.SepaParseException;
import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
import org.apache.streampipes.model.base.VersionedNamedStreamPipesEntity;
import org.apache.streampipes.model.extensions.migration.MigrationRequest;
import org.apache.streampipes.model.message.Notification;
@@ -139,7 +139,7 @@ public abstract class AbstractMigrationManager {
.execute()
.returnContent()
.asString();
- var updateResult = Operations.verifyAndUpdateElement(entityPayload);
+ var updateResult = new
TypeExtractor(entityPayload).getTypeVerifier().verifyAndUpdate();
if (!updateResult.isSuccess()) {
LOG.error(
"Updating the pipeline element description failed: {}",
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
index 40ff88acf3..5ac2ba0a58 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/PipelineElementMigrationManager.java
@@ -176,8 +176,8 @@ public class PipelineElementMigrationManager extends
AbstractMigrationManager im
public void stopPipeline(Pipeline pipeline) {
- var pipelineExecutor = new PipelineExecutor(pipeline, true);
- var pipelineStopResult = pipelineExecutor.stopPipeline();
+ var pipelineExecutor = new PipelineExecutor(pipeline);
+ var pipelineStopResult = pipelineExecutor.stopPipeline(true);
if (pipelineStopResult.isSuccess()) {
LOG.info("Pipeline successfully stopped.");
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
index 8d76e31252..9f2f430b20 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.java
@@ -22,12 +22,10 @@ package org.apache.streampipes.manager.monitoring.pipeline;
import org.apache.streampipes.commons.constants.InstanceIdExtractor;
import org.apache.streampipes.commons.prometheus.pipelines.PipelineFlowStats;
import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
-import org.apache.streampipes.model.client.user.Principal;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.monitoring.SpEndpointMonitoringInfo;
-import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes;
@@ -97,10 +95,6 @@ public class ExtensionsServiceLogExecutor implements
Runnable {
return
ExtensionServiceExecutions.extServiceGetRequest(makeLogUrl(serviceEndpointUrl));
}
- private Principal getServiceAdmin() {
- return new SpResourceManager().manageUsers().getServiceAdmin();
- }
-
private List<String> getActiveExtensionsEndpoints() {
return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(
DefaultSpServiceTypes.EXT,
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
deleted file mode 100644
index f2addd2b20..0000000000
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/operations/Operations.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.operations;
-
-import
org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
-import org.apache.streampipes.commons.exceptions.SepaParseException;
-import org.apache.streampipes.manager.execution.PipelineExecutor;
-import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
-import org.apache.streampipes.manager.recommender.ElementRecommender;
-import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
-import org.apache.streampipes.manager.storage.PipelineStorageService;
-import org.apache.streampipes.manager.template.PipelineTemplateGenerator;
-import
org.apache.streampipes.manager.template.PipelineTemplateInvocationGenerator;
-import
org.apache.streampipes.manager.template.PipelineTemplateInvocationHandler;
-import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
-import org.apache.streampipes.manager.verification.extractor.TypeExtractor;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.message.Message;
-import org.apache.streampipes.model.message.PipelineModificationMessage;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import
org.apache.streampipes.model.pipeline.PipelineElementRecommendationMessage;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
-import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
-import org.apache.streampipes.model.template.PipelineTemplateDescription;
-import org.apache.streampipes.model.template.PipelineTemplateInvocation;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * class that provides several (partial) pipeline verification methods
- */
-
-public class Operations {
-
- /**
- * @param pipeline the pipeline to validate
- * @return PipelineModificationMessage a message containing desired pipeline
modifications
- */
- public static PipelineModificationMessage validatePipeline(Pipeline
pipeline) throws Exception {
- return new PipelineVerificationHandlerV2(pipeline).verifyPipeline();
- }
-
- public static Message verifyAndAddElement(String graphData,
- String principalSid,
- boolean publicElement) throws
SepaParseException {
- return new
TypeExtractor(graphData).getTypeVerifier().verifyAndAdd(principalSid,
publicElement);
- }
-
- public static Message verifyAndUpdateElement(String graphData) throws
SepaParseException {
- return new TypeExtractor(graphData).getTypeVerifier().verifyAndUpdate();
- }
-
- public static PipelineElementRecommendationMessage
findRecommendedElements(Pipeline partialPipeline,
-
String baseRecId)
- throws NoSuitableSepasAvailableException {
- return new ElementRecommender(partialPipeline,
baseRecId).findRecommendedElements();
- }
-
- public static void storePipeline(Pipeline pipeline) {
- new PipelineStorageService(pipeline).addPipeline();
- }
-
- public static void updatePipeline(Pipeline pipeline) {
- new PipelineStorageService(pipeline).updatePipeline();
- }
-
- public static PipelineOperationStatus startPipeline(Pipeline pipeline) {
- return new PipelineExecutor(pipeline, false).startPipeline();
- }
-
- public static List<PipelineOperationStatus> stopAllPipelines(boolean
forceStop) {
- List<PipelineOperationStatus> status = new ArrayList<>();
- List<Pipeline> pipelines =
-
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
-
- pipelines.forEach(p -> {
- if (p.isRunning()) {
- status.add(Operations.stopPipeline(p, forceStop));
- }
- });
- return status;
- }
-
- public static PipelineOperationStatus stopPipeline(Pipeline pipeline,
- boolean forceStop) {
- return new PipelineExecutor(pipeline, forceStop).stopPipeline();
- }
-
- public static SpDataStream updateActualTopic(SpDataStream stream) {
- return new WildcardTopicGenerator(stream).computeActualTopic();
- }
-
- public static RuntimeOptionsResponse
fetchRemoteOptions(RuntimeOptionsRequest request) {
- return new ContainerProvidedOptionsHandler().fetchRemoteOptions(request);
- }
-
- public static List<PipelineTemplateDescription> getAllPipelineTemplates() {
- return new PipelineTemplateGenerator().getAllPipelineTemplates();
- }
-
- public static PipelineOperationStatus handlePipelineTemplateInvocation(
- String userSid,
- PipelineTemplateInvocation pipelineTemplateInvocation) {
- return new PipelineTemplateInvocationHandler(userSid,
pipelineTemplateInvocation).handlePipelineInvocation();
- }
-
- public static PipelineTemplateInvocation getPipelineInvocationTemplate(
- SpDataStream dataStream,
- PipelineTemplateDescription pipelineTemplateDescription) {
- return new PipelineTemplateInvocationGenerator(dataStream,
pipelineTemplateDescription).generateInvocation();
- }
-}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
index e57675ecba..ca477d73db 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/PipelineManager.java
@@ -19,8 +19,9 @@
package org.apache.streampipes.manager.pipeline;
import org.apache.streampipes.commons.random.UUIDGenerator;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.manager.permission.PermissionManager;
+import org.apache.streampipes.manager.storage.PipelineStorageService;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.client.user.Permission;
import org.apache.streampipes.model.pipeline.Pipeline;
@@ -30,6 +31,7 @@ import org.apache.streampipes.storage.api.IPermissionStorage;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
@@ -71,7 +73,7 @@ public class PipelineManager {
? UUIDGenerator.generateUuid()
: pipeline.getPipelineId();
preparePipelineBasics(principalSid, pipeline, pipelineId);
- Operations.storePipeline(pipeline);
+ new PipelineStorageService(pipeline).addPipeline();
Permission permission = new PermissionManager().makePermission(pipeline,
principalSid);
getPermissionStorage().persist(permission);
@@ -88,7 +90,7 @@ public class PipelineManager {
*/
public static PipelineOperationStatus startPipeline(String pipelineId) {
Pipeline pipeline = getPipeline(pipelineId);
- return Operations.startPipeline(pipeline);
+ return new PipelineExecutor(pipeline).startPipeline();
}
/**
@@ -103,7 +105,7 @@ public class PipelineManager {
boolean forceStop) {
Pipeline pipeline = getPipeline(pipelineId);
- return Operations.stopPipeline(pipeline, forceStop);
+ return new PipelineExecutor(pipeline).stopPipeline(forceStop);
}
/**
@@ -119,6 +121,19 @@ public class PipelineManager {
}
}
+ public static List<PipelineOperationStatus> stopAllPipelines(boolean
forceStop) {
+ List<PipelineOperationStatus> status = new ArrayList<>();
+ List<Pipeline> pipelines =
+
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().findAll();
+
+ pipelines.forEach(p -> {
+ if (p.isRunning()) {
+ status.add(new PipelineExecutor(p).stopPipeline(forceStop));
+ }
+ });
+ return status;
+ }
+
/**
* Checks for the pipelines that contain the processing element
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
index b301a5243a..a32bd5d6d3 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java
@@ -202,10 +202,6 @@ public class PipelineElementTemplateVisitor implements
StaticPropertyVisitor {
// TODO not yet supported
}
- private Object getValue(StaticProperty sp) {
- return ((Map<String, Object>)
configs.get(sp.getInternalName())).get("value");
- }
-
private List<String> getValueAsList(StaticProperty sp) {
return (List<String>) configs.get(sp.getInternalName());
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java
index 4531fc88f2..86c46bc742 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateGenerator.java
@@ -18,15 +18,9 @@
package org.apache.streampipes.manager.template;
import org.apache.streampipes.commons.exceptions.ElementNotFoundException;
-import org.apache.streampipes.manager.matching.v2.ElementVerification;
import
org.apache.streampipes.manager.template.instances.DataLakePipelineTemplate;
import org.apache.streampipes.manager.template.instances.PipelineTemplate;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.template.PipelineTemplateDescription;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
@@ -43,7 +37,7 @@ public class PipelineTemplateGenerator {
Logger logger = LoggerFactory.getLogger(PipelineTemplateGenerator.class);
- private List<PipelineTemplateDescription> availableDescriptions = new
ArrayList<>();
+ private final List<PipelineTemplateDescription> availableDescriptions = new
ArrayList<>();
public List<PipelineTemplateDescription> getAllPipelineTemplates() {
@@ -65,62 +59,6 @@ public class PipelineTemplateGenerator {
return availableDescriptions;
}
- public List<PipelineTemplateDescription> getCompatibleTemplates(String
streamId) {
- List<PipelineTemplateDescription> compatibleTemplates = new ArrayList<>();
- ElementVerification verifier = new ElementVerification();
- SpDataStream streamOffer = null;
-
- try {
- streamOffer = getStream(streamId);
- streamOffer = new SpDataStream(streamOffer);
- if (streamOffer != null) {
- for (PipelineTemplateDescription pipelineTemplateDescription :
getAllPipelineTemplates()) {
- // TODO make this work for 2+ input streams
- InvocableStreamPipesEntity entity =
-
cloneInvocation(pipelineTemplateDescription.getBoundTo().get(0).getPipelineElementTemplate());
- if (verifier.verify(streamOffer, entity)) {
- compatibleTemplates.add(pipelineTemplateDescription);
- }
- }
- }
-
- } catch (ElementNotFoundException e) {
- e.printStackTrace();
- }
-
- return compatibleTemplates;
- }
-
- private InvocableStreamPipesEntity
cloneInvocation(InvocableStreamPipesEntity pipelineElementTemplate) {
- if (pipelineElementTemplate instanceof DataProcessorInvocation) {
- return new DataProcessorInvocation((DataProcessorInvocation)
pipelineElementTemplate);
- } else {
- return new DataSinkInvocation((DataSinkInvocation)
pipelineElementTemplate);
- }
- }
-
- protected SpDataStream getStream(String streamId) throws
ElementNotFoundException {
- SpDataStream result = getStorage()
- .getEventStreamById(streamId);
-
- if (result == null) {
- throw new ElementNotFoundException("Data stream " + streamId + " is not
installed!");
- }
-
- return result;
- }
-
- protected DataProcessorDescription getProcessor(String id) throws
ElementNotFoundException {
- DataProcessorDescription result = getStorage()
- .getDataProcessorByAppId(id);
-
- if (result == null) {
- throw new ElementNotFoundException("Data processor " + id + " is not
installed!");
- }
-
- return result;
- }
-
protected DataSinkDescription getSink(String id) throws
ElementNotFoundException {
try {
return getStorage()
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java
index 0848abdac9..7b1748a9ef 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineTemplateInvocationHandler.java
@@ -17,8 +17,9 @@
*/
package org.apache.streampipes.manager.template;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.manager.permission.PermissionManager;
+import org.apache.streampipes.manager.storage.PipelineStorageService;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.client.user.Permission;
import org.apache.streampipes.model.pipeline.Pipeline;
@@ -33,9 +34,9 @@ import java.util.List;
public class PipelineTemplateInvocationHandler {
- private PipelineTemplateInvocation pipelineTemplateInvocation;
- private PipelineTemplateDescription pipelineTemplateDescription;
- private String username;
+ private final PipelineTemplateInvocation pipelineTemplateInvocation;
+ private final PipelineTemplateDescription pipelineTemplateDescription;
+ private final String username;
public PipelineTemplateInvocationHandler(String username,
PipelineTemplateInvocation pipelineTemplateInvocation) {
this.username = username;
@@ -43,26 +44,18 @@ public class PipelineTemplateInvocationHandler {
this.pipelineTemplateDescription =
getTemplateById(pipelineTemplateInvocation.getPipelineTemplateId());
}
- public PipelineTemplateInvocationHandler(String username,
PipelineTemplateInvocation pipelineTemplateInvocation,
- PipelineTemplateDescription
pipelineTemplateDescription) {
- this.username = username;
- this.pipelineTemplateInvocation = pipelineTemplateInvocation;
- this.pipelineTemplateDescription = pipelineTemplateDescription;
- }
-
-
public PipelineOperationStatus handlePipelineInvocation() {
Pipeline pipeline = new
PipelineGenerator(pipelineTemplateInvocation.getDataStreamId(),
pipelineTemplateDescription,
pipelineTemplateInvocation.getKviName()).makePipeline();
pipeline.setCreatedByUser(username);
pipeline.setCreatedAt(System.currentTimeMillis());
replaceStaticProperties(pipeline);
- Operations.storePipeline(pipeline);
+ new PipelineStorageService(pipeline).addPipeline();
Permission permission = new PermissionManager().makePermission(pipeline,
username);
StorageDispatcher.INSTANCE.getNoSqlStore().getPermissionStorage().persist(permission);
Pipeline storedPipeline =
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getElementById(pipeline.getPipelineId());
- return Operations.startPipeline(storedPipeline);
+ return new PipelineExecutor(storedPipeline).startPipeline();
}
private void replaceStaticProperties(Pipeline pipeline) {
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java
deleted file mode 100644
index 404c34f5e4..0000000000
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/PipelineVerificationUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.util;
-
-import org.apache.streampipes.commons.exceptions.NoSepaInPipelineException;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class PipelineVerificationUtils {
-
- /**
- * returns the root node of a partial pipeline (a pipeline without an action)
- *
- * @param pipeline
- * @return {@link
org.apache.streampipes.model.base.InvocableStreamPipesEntity}
- */
-
- public static InvocableStreamPipesEntity getRootNode(Pipeline pipeline)
throws NoSepaInPipelineException {
- List<InvocableStreamPipesEntity> elements = new ArrayList<>();
- elements.addAll(pipeline.getSepas());
- elements.addAll(pipeline.getActions());
-
- List<InvocableStreamPipesEntity> unconfiguredElements = elements
- .stream()
- .filter(e -> !e.isConfigured())
- .collect(Collectors.toList());
-
-
- if (unconfiguredElements.size() != 1) {
- throw new NoSepaInPipelineException();
- } else {
- return unconfiguredElements.get(0);
- }
-
- }
-}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java
deleted file mode 100644
index 4986bba5a2..0000000000
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TreeUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.manager.util;
-
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TreeUtils {
-
- /**
- * @param id the DOM ID
- * @param sepas list of sepas in model-client format
- * @param streams list of streams in model-client format
- * @return a SEPA-client element
- */
-
- public static NamedStreamPipesEntity findSEPAElement(String id,
List<DataProcessorInvocation> sepas,
- List<SpDataStream>
- streams) {
- List<NamedStreamPipesEntity> allElements = new ArrayList<>();
- allElements.addAll(sepas);
- allElements.addAll(streams);
-
- for (NamedStreamPipesEntity element : allElements) {
- if (id.equals(element.getDom())) {
- return element;
- }
- }
- //TODO
- return null;
- }
-
- /**
- * @param id the DOM ID
- * @param graphs list of invocation graphs
- * @return an invocation graph with a given DOM Id
- */
- public static InvocableStreamPipesEntity findByDomId(String id,
List<InvocableStreamPipesEntity> graphs) {
- for (InvocableStreamPipesEntity graph : graphs) {
- if (graph.getDom().equals(id)) {
- return graph;
- }
- }
- //TODO
- return null;
- }
-}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
index cce62bc080..1d41f47cc4 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ContainerProvidedOptions.java
@@ -17,7 +17,7 @@
*/
package org.apache.streampipes.rest.impl;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.remote.ContainerProvidedOptionsHandler;
import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
@@ -38,6 +38,6 @@ public class ContainerProvidedOptions extends
AbstractRestResource {
consumes = MediaType.APPLICATION_JSON_VALUE
)
public ResponseEntity<RuntimeOptionsResponse>
fetchRemoteOptions(@RequestBody RuntimeOptionsRequest request) {
- return ok(Operations.fetchRemoteOptions(request));
+ return ok(new
ContainerProvidedOptionsHandler().fetchRemoteOptions(request));
}
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java
index db72f9085f..be98a833ab 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/DataStream.java
@@ -17,7 +17,7 @@
*/
package org.apache.streampipes.rest.impl;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.topic.WildcardTopicGenerator;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
@@ -34,6 +34,6 @@ public class DataStream extends AbstractRestResource {
@PostMapping(path = "/update", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<SpDataStream> getStreamsBySource(@RequestBody
SpDataStream stream) {
- return ok(Operations.updateActualTopic(stream));
+ return ok(new WildcardTopicGenerator(stream).computeActualTopic());
}
}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
index 75e636cd6b..a358839ec1 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineResource.java
@@ -18,16 +18,12 @@
package org.apache.streampipes.rest.impl;
-import org.apache.streampipes.commons.exceptions.NoMatchingFormatException;
-import org.apache.streampipes.commons.exceptions.NoMatchingJsonSchemaException;
-import org.apache.streampipes.commons.exceptions.NoMatchingProtocolException;
-import org.apache.streampipes.commons.exceptions.NoMatchingSchemaException;
import
org.apache.streampipes.commons.exceptions.NoSuitableSepasAvailableException;
-import
org.apache.streampipes.commons.exceptions.RemoteServerNotAccessibleException;
import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
import org.apache.streampipes.manager.pipeline.PipelineManager;
-import
org.apache.streampipes.model.client.exception.InvalidConnectionException;
+import org.apache.streampipes.manager.recommender.ElementRecommender;
+import org.apache.streampipes.manager.storage.PipelineStorageService;
import org.apache.streampipes.model.message.ErrorMessage;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.Notification;
@@ -178,7 +174,7 @@ public class PipelineResource extends
AbstractAuthGuardedRestResource {
public PipelineElementRecommendationMessage recommend(@RequestBody Pipeline
pipeline,
@PathVariable("recId")
String baseRecElement) {
try {
- return Operations.findRecommendedElements(pipeline, baseRecElement);
+ return new ElementRecommender(pipeline,
baseRecElement).findRecommendedElements();
} catch (JsonSyntaxException e) {
throw new SpNotificationException(
HttpStatus.BAD_REQUEST,
@@ -207,19 +203,9 @@ public class PipelineResource extends
AbstractAuthGuardedRestResource {
@PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
public ResponseEntity<?> validatePipeline(@RequestBody Pipeline pipeline) {
try {
- return ok(Operations.validatePipeline(pipeline));
+ return ok(new PipelineVerificationHandlerV2(pipeline).verifyPipeline());
} catch (JsonSyntaxException e) {
return badRequest(new Notification(NotificationType.UNKNOWN_ERROR,
e.getMessage()));
- } catch (NoMatchingSchemaException e) {
- return badRequest(new Notification(NotificationType.NO_VALID_CONNECTION,
e.getMessage()));
- } catch (NoMatchingFormatException e) {
- return badRequest(new
Notification(NotificationType.NO_MATCHING_FORMAT_CONNECTION, e.getMessage()));
- } catch (NoMatchingProtocolException e) {
- return badRequest(new
Notification(NotificationType.NO_MATCHING_PROTOCOL_CONNECTION, e.getMessage()));
- } catch (RemoteServerNotAccessibleException |
NoMatchingJsonSchemaException e) {
- return serverError(new
Notification(NotificationType.REMOTE_SERVER_NOT_ACCESSIBLE, e.getMessage()));
- } catch (InvalidConnectionException e) {
- return badRequest(e.getErrorLog());
} catch (Exception e) {
LOG.error(e.getMessage());
return serverError(new Notification(NotificationType.UNKNOWN_ERROR,
e.getMessage()));
@@ -244,7 +230,7 @@ public class PipelineResource extends
AbstractAuthGuardedRestResource {
storedPipeline.setHealthStatus(pipeline.getHealthStatus());
storedPipeline.setPipelineNotifications(pipeline.getPipelineNotifications());
storedPipeline.setValid(pipeline.isValid());
- Operations.updatePipeline(storedPipeline);
+ new PipelineStorageService(storedPipeline).updatePipeline();
SuccessMessage message = Notifications.success("Pipeline modified");
message.addNotification(new Notification("id", pipelineId));
return ok(message);
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
index f3496c1213..5957fe01e2 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineTemplate.java
@@ -17,7 +17,9 @@
*/
package org.apache.streampipes.rest.impl;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.template.PipelineTemplateGenerator;
+import
org.apache.streampipes.manager.template.PipelineTemplateInvocationGenerator;
+import
org.apache.streampipes.manager.template.PipelineTemplateInvocationHandler;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.SpDataStreamContainer;
import org.apache.streampipes.model.message.Notifications;
@@ -67,7 +69,10 @@ public class PipelineTemplate extends
AbstractAuthGuardedRestResource {
var pipelineTemplateDescriptionOpt =
getPipelineTemplateDescription(pipelineTemplateId);
if (pipelineTemplateDescriptionOpt.isPresent()) {
PipelineTemplateInvocation invocation =
- Operations.getPipelineInvocationTemplate(dataStream,
pipelineTemplateDescriptionOpt.get());
+ new PipelineTemplateInvocationGenerator(
+ dataStream,
+ pipelineTemplateDescriptionOpt.get()
+ ).generateInvocation();
PipelineTemplateInvocation clonedInvocation = new
PipelineTemplateInvocation(invocation);
return ok(new PipelineTemplateInvocation(clonedInvocation));
} else {
@@ -85,14 +90,15 @@ public class PipelineTemplate extends
AbstractAuthGuardedRestResource {
public ResponseEntity<PipelineOperationStatus> generatePipeline(
@RequestBody PipelineTemplateInvocation pipelineTemplateInvocation) {
- PipelineOperationStatus status = Operations
- .handlePipelineTemplateInvocation(getAuthenticatedUserSid(),
pipelineTemplateInvocation);
-
+ PipelineOperationStatus status = new PipelineTemplateInvocationHandler(
+ getAuthenticatedUserSid(),
+ pipelineTemplateInvocation
+ ).handlePipelineInvocation();
return ok(status);
}
private Optional<PipelineTemplateDescription>
getPipelineTemplateDescription(String pipelineTemplateId) {
- return Operations
+ return new PipelineTemplateGenerator()
.getAllPipelineTemplates()
.stream()
.filter(pt -> pt.getAppId().equals(pipelineTemplateId))
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
index f7f0c32d22..06c7257bf0 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java
@@ -20,8 +20,8 @@ package org.apache.streampipes.service.core;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
import
org.apache.streampipes.connect.management.management.WorkerAdministrationManagement;
+import org.apache.streampipes.manager.execution.PipelineExecutor;
import org.apache.streampipes.manager.health.ServiceHealthCheck;
-import org.apache.streampipes.manager.operations.Operations;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
@@ -120,7 +120,7 @@ public class PostStartupTask implements Runnable {
}
private void startPipeline(Pipeline pipeline, boolean restartOnReboot) {
- PipelineOperationStatus status = Operations.startPipeline(pipeline);
+ PipelineOperationStatus status = new
PipelineExecutor(pipeline).startPipeline();
if (status.isSuccess()) {
LOG.info("Pipeline {} successfully restarted", status.getPipelineName());
Pipeline storedPipeline =
getPipelineStorage().getElementById(pipeline.getPipelineId());
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index f9c062b546..610cf23f41 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -26,7 +26,7 @@ import
org.apache.streampipes.manager.health.CoreServiceStatusManager;
import org.apache.streampipes.manager.health.PipelineHealthCheck;
import org.apache.streampipes.manager.health.ServiceHealthCheck;
import
org.apache.streampipes.manager.monitoring.pipeline.ExtensionsServiceLogExecutor;
-import org.apache.streampipes.manager.operations.Operations;
+import org.apache.streampipes.manager.pipeline.PipelineManager;
import org.apache.streampipes.manager.setup.AutoInstallation;
import org.apache.streampipes.manager.setup.StreamPipesEnvChecker;
import org.apache.streampipes.messaging.SpProtocolManager;
@@ -221,7 +221,7 @@ public class StreamPipesCoreApplication extends
StreamPipesServiceBase {
});
LOG.info("Gracefully stopping all running pipelines...");
- List<PipelineOperationStatus> status = Operations.stopAllPipelines(true);
+ List<PipelineOperationStatus> status =
PipelineManager.stopAllPipelines(true);
status.forEach(s -> {
if (s.isSuccess()) {
LOG.info("Pipeline {} successfully stopped", s.getPipelineName());