http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java new file mode 100644 index 0000000..41b98ed --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +import java.util.Optional; + +public class StandardVersionControlInformation implements VersionControlInformation { + + private final String registryIdentifier; + private final String bucketIdentifier; + private final String flowIdentifier; + private final int version; + private volatile VersionedProcessGroup flowSnapshot; + private volatile Boolean modified = null; + private volatile Boolean current = null; + + public StandardVersionControlInformation(final String registryId, final String bucketId, final String flowId, final int version, + final VersionedProcessGroup snapshot, final Boolean modified, final Boolean current) { + this.registryIdentifier = registryId; + this.bucketIdentifier = bucketId; + this.flowIdentifier = flowId; + this.version = version; + this.flowSnapshot = snapshot; + this.modified = modified; + this.current = current; + } + + @Override + public String getRegistryIdentifier() { + return registryIdentifier; + } + + @Override + public String getBucketIdentifier() { + return bucketIdentifier; + } + + @Override + public String getFlowIdentifier() { + return flowIdentifier; + } + + @Override + public int getVersion() { + return version; + } + + @Override + public Optional<Boolean> getModified() { + return Optional.ofNullable(modified); + } + + @Override + public Optional<Boolean> getCurrent() { + return Optional.ofNullable(current); + } + + @Override + public VersionedProcessGroup getFlowSnapshot() { + return flowSnapshot; + } + + public void setModified(final boolean modified) { + this.modified = modified; + } + + public void setCurrent(final boolean current) { + this.current = current; + } + + public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) { + this.flowSnapshot = flowSnapshot; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedConnectableComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedConnectableComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedConnectableComponent.java new file mode 100644 index 0000000..26ad300 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedConnectableComponent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.ConnectableComponent; + +public class InstantiatedConnectableComponent extends ConnectableComponent implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedConnectableComponent(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java new file mode 100644 index 0000000..15c620a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +public interface InstantiatedVersionedComponent { + String getInstanceId(); + + String getInstanceGroupId(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedConnection.java new file mode 100644 index 0000000..d18733a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedConnection.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedConnection; + +public class InstantiatedVersionedConnection extends VersionedConnection implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedVersionedConnection(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedControllerService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedControllerService.java new file mode 100644 index 0000000..0617cd5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedControllerService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedControllerService; + +public class InstantiatedVersionedControllerService extends VersionedControllerService implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedVersionedControllerService(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedFunnel.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedFunnel.java new file mode 100644 index 0000000..6b1f230 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedFunnel.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedFunnel; + +public class InstantiatedVersionedFunnel extends VersionedFunnel implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedVersionedFunnel(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedLabel.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedLabel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedLabel.java new file mode 100644 index 0000000..1c061c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedLabel.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedLabel; + +public class InstantiatedVersionedLabel extends VersionedLabel implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedVersionedLabel(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedPort.java new file mode 100644 index 0000000..b3ddb40 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedPort.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedPort; + +public class InstantiatedVersionedPort extends VersionedPort implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedVersionedPort(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java new file mode 100644 index 0000000..a669220 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedProcessGroup; + +public class InstantiatedVersionedProcessGroup extends VersionedProcessGroup implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedVersionedProcessGroup(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessor.java new file mode 100644 index 0000000..2763e9d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedProcessor; + +public class InstantiatedVersionedProcessor extends VersionedProcessor implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedVersionedProcessor(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteGroupPort.java new file mode 100644 index 0000000..27805fa --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteGroupPort.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; + +public class InstantiatedVersionedRemoteGroupPort extends VersionedRemoteGroupPort implements InstantiatedVersionedComponent { + private final String instanceId; + private final String remoteGroupId; + + public InstantiatedVersionedRemoteGroupPort(final String instanceId, final String instanceRemoteGroupId) { + this.instanceId = instanceId; + this.remoteGroupId = instanceRemoteGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return remoteGroupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteProcessGroup.java new file mode 100644 index 0000000..57816ec --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteProcessGroup.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; + +public class InstantiatedVersionedRemoteProcessGroup extends VersionedRemoteProcessGroup implements InstantiatedVersionedComponent { + private final String instanceId; + private final String groupId; + + public InstantiatedVersionedRemoteProcessGroup(final String instanceId, final String instanceGroupId) { + this.instanceId = instanceId; + this.groupId = instanceGroupId; + } + + @Override + public String getInstanceId() { + return instanceId; + } + + @Override + public String getInstanceGroupId() { + return groupId; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java new file mode 100644 index 0000000..c3c1037 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.nifi.registry.flow.BatchSize; +import org.apache.nifi.registry.flow.Bundle; +import org.apache.nifi.registry.flow.ComponentType; +import org.apache.nifi.registry.flow.ConnectableComponent; +import org.apache.nifi.registry.flow.ConnectableComponentType; +import org.apache.nifi.registry.flow.ControllerServiceAPI; +import org.apache.nifi.registry.flow.PortType; +import org.apache.nifi.registry.flow.Position; +import org.apache.nifi.registry.flow.VersionedConnection; +import org.apache.nifi.registry.flow.VersionedControllerService; +import org.apache.nifi.registry.flow.VersionedFunnel; +import org.apache.nifi.registry.flow.VersionedLabel; +import org.apache.nifi.registry.flow.VersionedPort; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; +import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; +import org.apache.nifi.web.api.dto.BundleDTO; +import org.apache.nifi.web.api.dto.ConnectableDTO; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ControllerServiceApiDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.FunnelDTO; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.PositionDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; + + +public class NiFiRegistryDtoMapper { + // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when + // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned' + // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been + // created before attempting to create the connection, where the ConnectableDTO is converted. + private Map<String, String> versionedComponentIds = new HashMap<>(); + + public VersionedProcessGroup mapProcessGroup(final ProcessGroupDTO dto) { + versionedComponentIds.clear(); + return mapGroup(dto); + } + + private VersionedProcessGroup mapGroup(final ProcessGroupDTO dto) { + final VersionedProcessGroup versionedGroup = new VersionedProcessGroup(); + versionedGroup.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + versionedGroup.setGroupIdentifier(getGroupId(dto.getParentGroupId())); + versionedGroup.setName(dto.getName()); + versionedGroup.setComments(dto.getComments()); + versionedGroup.setPosition(mapPosition(dto.getPosition())); + + final FlowSnippetDTO contents = dto.getContents(); + + versionedGroup.setControllerServices(contents.getControllerServices().stream() + .map(this::mapControllerService) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setFunnels(contents.getFunnels().stream() + .map(this::mapFunnel) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setInputPorts(contents.getInputPorts().stream() + .map(this::mapPort) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setOutputPorts(contents.getOutputPorts().stream() + .map(this::mapPort) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setLabels(contents.getLabels().stream() + .map(this::mapLabel) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setProcessors(contents.getProcessors().stream() + .map(this::mapProcessor) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setRemoteProcessGroups(contents.getRemoteProcessGroups().stream() + .map(this::mapRemoteProcessGroup) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setProcessGroups(contents.getProcessGroups().stream() + .map(this::mapGroup) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setConnections(contents.getConnections().stream() + .map(this::mapConnection) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + return versionedGroup; + } + + private String getId(final String currentVersionedId, final String componentId) { + final String versionedId; + if (currentVersionedId == null) { + versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString(); + } else { + versionedId = currentVersionedId; + } + + versionedComponentIds.put(componentId, versionedId); + return versionedId; + } + + private String getGroupId(final String groupId) { + return versionedComponentIds.get(groupId); + } + + public VersionedConnection mapConnection(final ConnectionDTO dto) { + final VersionedConnection connection = new VersionedConnection(); + connection.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + connection.setGroupIdentifier(getGroupId(dto.getParentGroupId())); + connection.setName(dto.getName()); + connection.setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold()); + connection.setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold()); + connection.setFlowFileExpiration(dto.getFlowFileExpiration()); + connection.setLabelIndex(dto.getLabelIndex()); + connection.setPosition(mapPosition(dto.getPosition())); + connection.setPrioritizers(dto.getPrioritizers()); + connection.setSelectedRelationships(dto.getSelectedRelationships()); + connection.setzIndex(dto.getzIndex()); + + connection.setBends(dto.getBends().stream() + .map(this::mapPosition) + .collect(Collectors.toList())); + + connection.setSource(mapConnectable(dto.getSource())); + connection.setDestination(mapConnectable(dto.getDestination())); + + return connection; + } + + public ConnectableComponent mapConnectable(final ConnectableDTO dto) { + final ConnectableComponent component = new ConnectableComponent(); + + final String versionedId = dto.getVersionedComponentId(); + if (versionedId == null) { + final String resolved = versionedComponentIds.get(dto.getId()); + if (resolved == null) { + throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + dto.getId() + " to any version-controlled component"); + } + + component.setId(resolved); + } else { + component.setId(versionedId); + } + + component.setComments(dto.getComments()); + component.setGroupId(dto.getGroupId()); + component.setName(dto.getName()); + component.setType(ConnectableComponentType.valueOf(dto.getType())); + return component; + } + + public VersionedControllerService mapControllerService(final ControllerServiceDTO dto) { + final VersionedControllerService service = new VersionedControllerService(); + service.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + service.setGroupIdentifier(getGroupId(dto.getParentGroupId())); + service.setName(dto.getName()); + service.setAnnotationData(dto.getAnnotationData()); + service.setBundle(mapBundle(dto.getBundle())); + service.setComments(dto.getComments()); + service.setControllerServiceApis(dto.getControllerServiceApis().stream() + .map(this::mapControllerServiceApi) + .collect(Collectors.toList())); + service.setProperties(dto.getProperties()); + service.setType(dto.getType()); + return null; + } + + private Bundle mapBundle(final BundleDTO dto) { + final Bundle bundle = new Bundle(); + bundle.setGroup(dto.getGroup()); + bundle.setArtifact(dto.getArtifact()); + bundle.setVersion(dto.getVersion()); + return bundle; + } + + private ControllerServiceAPI mapControllerServiceApi(final ControllerServiceApiDTO dto) { + final ControllerServiceAPI api = new ControllerServiceAPI(); + api.setBundle(mapBundle(dto.getBundle())); + api.setType(dto.getType()); + return api; + } + + public VersionedFunnel mapFunnel(final FunnelDTO dto) { + final VersionedFunnel funnel = new VersionedFunnel(); + funnel.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + funnel.setGroupIdentifier(getGroupId(dto.getParentGroupId())); + funnel.setPosition(mapPosition(dto.getPosition())); + return funnel; + } + + public VersionedLabel mapLabel(final LabelDTO dto) { + final VersionedLabel label = new VersionedLabel(); + label.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + label.setGroupIdentifier(getGroupId(dto.getParentGroupId())); + label.setHeight(dto.getHeight()); + label.setWidth(dto.getWidth()); + label.setLabel(dto.getLabel()); + label.setPosition(mapPosition(dto.getPosition())); + label.setStyle(dto.getStyle()); + return label; + } + + public VersionedPort mapPort(final PortDTO dto) { + final VersionedPort port = new VersionedPort(); + port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + port.setGroupIdentifier(getGroupId(dto.getParentGroupId())); + port.setComments(dto.getComments()); + port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount()); + port.setName(dto.getName()); + port.setPosition(mapPosition(dto.getPosition())); + port.setType(PortType.valueOf(dto.getType())); + return port; + } + + public Position mapPosition(final PositionDTO dto) { + final Position position = new Position(); + position.setX(dto.getX()); + position.setY(dto.getY()); + return position; + } + + public VersionedProcessor mapProcessor(final ProcessorDTO dto) { + final ProcessorConfigDTO config = dto.getConfig(); + + final VersionedProcessor processor = new VersionedProcessor(); + processor.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + processor.setGroupIdentifier(getGroupId(dto.getParentGroupId())); + processor.setType(dto.getType()); + processor.setAnnotationData(config.getAnnotationData()); + processor.setAutoTerminatedRelationships(config.getAutoTerminatedRelationships()); + processor.setBulletinLevel(config.getBulletinLevel()); + processor.setBundle(mapBundle(dto.getBundle())); + processor.setComments(config.getComments()); + processor.setConcurrentlySchedulableTaskCount(config.getConcurrentlySchedulableTaskCount()); + processor.setExecutionNode(config.getExecutionNode()); + processor.setName(dto.getName()); + processor.setPenaltyDuration(config.getPenaltyDuration()); + processor.setPosition(mapPosition(dto.getPosition())); + processor.setProperties(config.getProperties()); + processor.setRunDurationMillis(config.getRunDurationMillis()); + processor.setSchedulingPeriod(config.getSchedulingPeriod()); + processor.setSchedulingStrategy(config.getSchedulingStrategy()); + processor.setStyle(dto.getStyle()); + processor.setYieldDuration(config.getYieldDuration()); + return processor; + } + + public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroupDTO dto) { + final VersionedRemoteProcessGroup rpg = new VersionedRemoteProcessGroup(); + rpg.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + rpg.setGroupIdentifier(getGroupId(dto.getParentGroupId())); + rpg.setComments(dto.getComments()); + rpg.setCommunicationsTimeout(dto.getCommunicationsTimeout()); + rpg.setLocalNetworkInterface(dto.getLocalNetworkInterface()); + rpg.setName(dto.getName()); + rpg.setInputPorts(dto.getContents().getInputPorts().stream() + .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT)) + .collect(Collectors.toSet())); + rpg.setOutputPorts(dto.getContents().getOutputPorts().stream() + .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT)) + .collect(Collectors.toSet())); + rpg.setPosition(mapPosition(dto.getPosition())); + rpg.setProxyHost(dto.getProxyHost()); + rpg.setProxyPort(dto.getProxyPort()); + rpg.setProxyUser(dto.getProxyUser()); + rpg.setTargetUri(dto.getTargetUri()); + rpg.setTargetUris(dto.getTargetUris()); + rpg.setTransportProtocol(dto.getTransportProtocol()); + rpg.setYieldDuration(dto.getYieldDuration()); + return rpg; + } + + public VersionedRemoteGroupPort mapRemotePort(final RemoteProcessGroupPortDTO dto, final ComponentType componentType) { + final VersionedRemoteGroupPort port = new VersionedRemoteGroupPort(); + port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); + port.setGroupIdentifier(getGroupId(dto.getGroupId())); + port.setComments(dto.getComments()); + port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount()); + port.setGroupId(dto.getGroupId()); + port.setName(dto.getName()); + port.setUseCompression(dto.getUseCompression()); + port.setBatchSettings(mapBatchSettings(dto.getBatchSettings())); + port.setComponentType(componentType); + return port; + } + + private BatchSize mapBatchSettings(final BatchSettingsDTO dto) { + final BatchSize batchSize = new BatchSize(); + batchSize.setCount(dto.getCount()); + batchSize.setDuration(dto.getDuration()); + batchSize.setSize(dto.getSize()); + return batchSize; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java new file mode 100644 index 0000000..e3edc30 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.registry.flow.BatchSize; +import org.apache.nifi.registry.flow.Bundle; +import org.apache.nifi.registry.flow.ComponentType; +import org.apache.nifi.registry.flow.ConnectableComponent; +import org.apache.nifi.registry.flow.ConnectableComponentType; +import org.apache.nifi.registry.flow.ControllerServiceAPI; +import org.apache.nifi.registry.flow.FlowRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.PortType; +import org.apache.nifi.registry.flow.Position; +import org.apache.nifi.registry.flow.RemoteFlowCoordinates; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedConnection; +import org.apache.nifi.registry.flow.VersionedControllerService; +import org.apache.nifi.registry.flow.VersionedFunnel; +import org.apache.nifi.registry.flow.VersionedLabel; +import org.apache.nifi.registry.flow.VersionedPort; +import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; +import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; + + +public class NiFiRegistryFlowMapper { + // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when + // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned' + // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been + // created before attempting to create the connection, where the ConnectableDTO is converted. + private Map<String, String> versionedComponentIds = new HashMap<>(); + + public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient) { + versionedComponentIds.clear(); + return mapGroup(group, registryClient, true); + } + + private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel) { + final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier()); + versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier())); + versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier())); + versionedGroup.setName(group.getName()); + versionedGroup.setComments(group.getComments()); + versionedGroup.setPosition(mapPosition(group.getPosition())); + + // If we are at the 'top level', meaning that the given Process Group is the group that we are creating a VersionedProcessGroup for, + // then we don't want to include the RemoteFlowCoordinates; we want to include the group contents. The RemoteFlowCoordinates will be used + // only for a child group that is itself version controlled. + if (!topLevel) { + final VersionControlInformation versionControlInfo = group.getVersionControlInformation(); + if (versionControlInfo != null) { + final RemoteFlowCoordinates coordinates = new RemoteFlowCoordinates(); + final String registryId = versionControlInfo.getRegistryIdentifier(); + final FlowRegistry registry = registryClient.getFlowRegistry(registryId); + if (registry == null) { + throw new IllegalStateException("Process Group refers to a Flow Registry with ID " + registryId + " but no Flow Registry exists with that ID. Cannot resolve to a URL."); + } + + coordinates.setRegistryUrl(registry.getURL()); + coordinates.setBucketId(versionControlInfo.getBucketIdentifier()); + coordinates.setFlowId(versionControlInfo.getFlowIdentifier()); + coordinates.setVersion(versionControlInfo.getVersion()); + + // If the Process Group itself is remotely versioned, then we don't want to include its contents + // because the contents are remotely managed and not part of the versioning of this Process Group + return versionedGroup; + } + } + + versionedGroup.setControllerServices(group.getControllerServices(false).stream() + .map(this::mapControllerService) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setFunnels(group.getFunnels().stream() + .map(this::mapFunnel) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setInputPorts(group.getInputPorts().stream() + .map(this::mapPort) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setOutputPorts(group.getOutputPorts().stream() + .map(this::mapPort) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setLabels(group.getLabels().stream() + .map(this::mapLabel) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setProcessors(group.getProcessors().stream() + .map(this::mapProcessor) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream() + .map(this::mapRemoteProcessGroup) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setProcessGroups(group.getProcessGroups().stream() + .map(grp -> mapGroup(grp, registryClient, false)) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setConnections(group.getConnections().stream() + .map(this::mapConnection) + .collect(Collectors.toCollection(LinkedHashSet::new))); + + versionedGroup.setVariables(group.getVariableRegistry().getVariableMap().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().getName(), Map.Entry::getValue))); + + return versionedGroup; + } + + private String getId(final Optional<String> currentVersionedId, final String componentId) { + final String versionedId; + if (currentVersionedId.isPresent()) { + versionedId = currentVersionedId.get(); + } else { + versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString(); + } + + versionedComponentIds.put(componentId, versionedId); + return versionedId; + } + + private String getGroupId(final String groupId) { + return versionedComponentIds.get(groupId); + } + + public VersionedConnection mapConnection(final Connection connection) { + final FlowFileQueue queue = connection.getFlowFileQueue(); + + final VersionedConnection versionedConnection = new InstantiatedVersionedConnection(connection.getIdentifier(), connection.getProcessGroup().getIdentifier()); + versionedConnection.setIdentifier(getId(connection.getVersionedComponentId(), connection.getIdentifier())); + versionedConnection.setGroupIdentifier(getGroupId(connection.getProcessGroup().getIdentifier())); + versionedConnection.setName(connection.getName()); + versionedConnection.setBackPressureDataSizeThreshold(queue.getBackPressureDataSizeThreshold()); + versionedConnection.setBackPressureObjectThreshold(queue.getBackPressureObjectThreshold()); + versionedConnection.setFlowFileExpiration(queue.getFlowFileExpiration()); + versionedConnection.setLabelIndex(connection.getLabelIndex()); + versionedConnection.setPrioritizers(queue.getPriorities().stream().map(p -> p.getClass().getName()).collect(Collectors.toList())); + versionedConnection.setSelectedRelationships(connection.getRelationships().stream().map(Relationship::getName).collect(Collectors.toSet())); + versionedConnection.setzIndex(connection.getZIndex()); + + versionedConnection.setBends(connection.getBendPoints().stream() + .map(this::mapPosition) + .collect(Collectors.toList())); + + versionedConnection.setSource(mapConnectable(connection.getSource())); + versionedConnection.setDestination(mapConnectable(connection.getDestination())); + + return versionedConnection; + } + + public ConnectableComponent mapConnectable(final Connectable connectable) { + final ConnectableComponent component = new InstantiatedConnectableComponent(connectable.getIdentifier(), connectable.getProcessGroupIdentifier()); + + final Optional<String> versionedId = connectable.getVersionedComponentId(); + if (versionedId.isPresent()) { + component.setId(versionedId.get()); + } else { + final String resolved = versionedComponentIds.get(connectable.getIdentifier()); + if (resolved == null) { + throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component"); + } + + component.setId(resolved); + } + + component.setComments(connectable.getComments()); + component.setGroupId(connectable.getProcessGroupIdentifier()); + component.setName(connectable.getName()); + component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name())); + return component; + } + + public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService) { + final VersionedControllerService versionedService = new InstantiatedVersionedControllerService(controllerService.getIdentifier(), controllerService.getProcessGroupIdentifier()); + versionedService.setIdentifier(getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier())); + versionedService.setGroupIdentifier(getGroupId(controllerService.getProcessGroupIdentifier())); + versionedService.setName(controllerService.getName()); + versionedService.setAnnotationData(controllerService.getAnnotationData()); + versionedService.setBundle(mapBundle(controllerService.getBundleCoordinate())); + versionedService.setComments(controllerService.getComments()); + + versionedService.setControllerServiceApis(mapControllerServiceApis(controllerService)); + versionedService.setProperties(mapProperties(controllerService)); + versionedService.setType(controllerService.getCanonicalClassName()); + + return versionedService; + } + + private Map<String, String> mapProperties(final ConfiguredComponent component) { + final Map<String, String> mapped = new HashMap<>(); + component.getProperties().keySet().stream() + .forEach(property -> { + String value = component.getProperty(property); + if (value == null) { + value = property.getDefaultValue(); + } + mapped.put(property.getName(), value); + }); + return mapped; + } + + private Bundle mapBundle(final BundleCoordinate coordinate) { + final Bundle versionedBundle = new Bundle(); + versionedBundle.setGroup(coordinate.getGroup()); + versionedBundle.setArtifact(coordinate.getId()); + versionedBundle.setVersion(coordinate.getVersion()); + return versionedBundle; + } + + private List<ControllerServiceAPI> mapControllerServiceApis(final ControllerServiceNode service) { + final Class<?> serviceClass = service.getControllerServiceImplementation().getClass(); + + final Set<Class<?>> serviceApiClasses = new HashSet<>(); + // get all of it's interfaces to determine the controller service api's it implements + final List<Class<?>> interfaces = ClassUtils.getAllInterfaces(serviceClass); + for (final Class<?> i : interfaces) { + // add all controller services that's not ControllerService itself + if (ControllerService.class.isAssignableFrom(i) && !ControllerService.class.equals(i)) { + serviceApiClasses.add(i); + } + } + + + final List<ControllerServiceAPI> serviceApis = new ArrayList<>(); + for (final Class<?> serviceApiClass : serviceApiClasses) { + final BundleCoordinate bundleCoordinate = ExtensionManager.getBundle(serviceApiClass.getClassLoader()).getBundleDetails().getCoordinate(); + + final ControllerServiceAPI serviceApi = new ControllerServiceAPI(); + serviceApi.setType(serviceApiClass.getName()); + serviceApi.setBundle(mapBundle(bundleCoordinate)); + serviceApis.add(serviceApi); + } + return serviceApis; + } + + + public VersionedFunnel mapFunnel(final Funnel funnel) { + final VersionedFunnel versionedFunnel = new InstantiatedVersionedFunnel(funnel.getIdentifier(), funnel.getProcessGroupIdentifier()); + versionedFunnel.setIdentifier(getId(funnel.getVersionedComponentId(), funnel.getIdentifier())); + versionedFunnel.setGroupIdentifier(getGroupId(funnel.getProcessGroupIdentifier())); + versionedFunnel.setPosition(mapPosition(funnel.getPosition())); + + return versionedFunnel; + } + + public VersionedLabel mapLabel(final Label label) { + final VersionedLabel versionedLabel = new InstantiatedVersionedLabel(label.getIdentifier(), label.getProcessGroupIdentifier()); + versionedLabel.setIdentifier(getId(label.getVersionedComponentId(), label.getIdentifier())); + versionedLabel.setGroupIdentifier(getGroupId(label.getProcessGroupIdentifier())); + versionedLabel.setHeight(label.getSize().getHeight()); + versionedLabel.setWidth(label.getSize().getWidth()); + versionedLabel.setLabel(label.getValue()); + versionedLabel.setPosition(mapPosition(label.getPosition())); + versionedLabel.setStyle(label.getStyle()); + + return versionedLabel; + } + + public VersionedPort mapPort(final Port port) { + final VersionedPort versionedPort = new InstantiatedVersionedPort(port.getIdentifier(), port.getProcessGroupIdentifier()); + versionedPort.setIdentifier(getId(port.getVersionedComponentId(), port.getIdentifier())); + versionedPort.setGroupIdentifier(getGroupId(port.getProcessGroupIdentifier())); + versionedPort.setComments(port.getComments()); + versionedPort.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks()); + versionedPort.setName(port.getName()); + versionedPort.setPosition(mapPosition(port.getPosition())); + versionedPort.setType(PortType.valueOf(port.getComponentType())); + return versionedPort; + } + + public Position mapPosition(final org.apache.nifi.connectable.Position pos) { + final Position position = new Position(); + position.setX(pos.getX()); + position.setY(pos.getY()); + return position; + } + + public VersionedProcessor mapProcessor(final ProcessorNode procNode) { + final VersionedProcessor processor = new InstantiatedVersionedProcessor(procNode.getIdentifier(), procNode.getProcessGroupIdentifier()); + processor.setIdentifier(getId(procNode.getVersionedComponentId(), procNode.getIdentifier())); + processor.setGroupIdentifier(getGroupId(procNode.getProcessGroupIdentifier())); + processor.setType(procNode.getCanonicalClassName()); + processor.setAnnotationData(procNode.getAnnotationData()); + processor.setAutoTerminatedRelationships(procNode.getAutoTerminatedRelationships().stream().map(Relationship::getName).collect(Collectors.toSet())); + processor.setBulletinLevel(procNode.getBulletinLevel().name()); + processor.setBundle(mapBundle(procNode.getBundleCoordinate())); + processor.setComments(procNode.getComments()); + processor.setConcurrentlySchedulableTaskCount(procNode.getMaxConcurrentTasks()); + processor.setExecutionNode(procNode.getExecutionNode().name()); + processor.setName(procNode.getName()); + processor.setPenaltyDuration(procNode.getPenalizationPeriod()); + processor.setPosition(mapPosition(procNode.getPosition())); + processor.setProperties(mapProperties(procNode)); + processor.setRunDurationMillis(procNode.getRunDuration(TimeUnit.MILLISECONDS)); + processor.setSchedulingPeriod(procNode.getSchedulingPeriod()); + processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name()); + processor.setStyle(procNode.getStyle()); + processor.setYieldDuration(procNode.getYieldPeriod()); + + return processor; + } + + public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroup remoteGroup) { + final VersionedRemoteProcessGroup rpg = new InstantiatedVersionedRemoteProcessGroup(remoteGroup.getIdentifier(), remoteGroup.getProcessGroupIdentifier()); + rpg.setIdentifier(getId(remoteGroup.getVersionedComponentId(), remoteGroup.getIdentifier())); + rpg.setGroupIdentifier(getGroupId(remoteGroup.getProcessGroupIdentifier())); + rpg.setComments(remoteGroup.getComments()); + rpg.setCommunicationsTimeout(remoteGroup.getCommunicationsTimeout()); + rpg.setLocalNetworkInterface(remoteGroup.getNetworkInterface()); + rpg.setName(remoteGroup.getName()); + rpg.setInputPorts(remoteGroup.getInputPorts().stream() + .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT)) + .collect(Collectors.toSet())); + rpg.setOutputPorts(remoteGroup.getOutputPorts().stream() + .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT)) + .collect(Collectors.toSet())); + rpg.setPosition(mapPosition(remoteGroup.getPosition())); + rpg.setProxyHost(remoteGroup.getProxyHost()); + rpg.setProxyPort(remoteGroup.getProxyPort()); + rpg.setProxyUser(remoteGroup.getProxyUser()); + rpg.setTargetUri(remoteGroup.getTargetUri()); + rpg.setTargetUris(remoteGroup.getTargetUris()); + rpg.setTransportProtocol(remoteGroup.getTransportProtocol().name()); + rpg.setYieldDuration(remoteGroup.getYieldDuration()); + return rpg; + } + + public VersionedRemoteGroupPort mapRemotePort(final RemoteGroupPort remotePort, final ComponentType componentType) { + final VersionedRemoteGroupPort port = new InstantiatedVersionedRemoteGroupPort(remotePort.getIdentifier(), remotePort.getRemoteProcessGroup().getIdentifier()); + port.setIdentifier(getId(remotePort.getVersionedComponentId(), remotePort.getIdentifier())); + port.setGroupIdentifier(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier())); + port.setComments(remotePort.getComments()); + port.setConcurrentlySchedulableTaskCount(remotePort.getMaxConcurrentTasks()); + port.setGroupId(remotePort.getProcessGroupIdentifier()); + port.setName(remotePort.getName()); + port.setUseCompression(remotePort.isUseCompression()); + port.setBatchSettings(mapBatchSettings(remotePort)); + port.setComponentType(componentType); + return port; + } + + private BatchSize mapBatchSettings(final RemoteGroupPort remotePort) { + final BatchSize batchSize = new BatchSize(); + batchSize.setCount(remotePort.getBatchCount()); + batchSize.setDuration(remotePort.getBatchDuration()); + batchSize.setSize(remotePort.getBatchSize()); + return batchSize; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 53d5c9f..6b55735 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; @@ -109,6 +110,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final AtomicReference<String> comments = new AtomicReference<>(); private final AtomicReference<ProcessGroup> processGroup; private final AtomicBoolean transmitting = new AtomicBoolean(false); + private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final SSLContext sslContext; private volatile String communicationsTimeout = "30 sec"; @@ -1419,4 +1421,26 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { return new File(stateDir, getIdentifier() + ".peers"); } + @Override + public Optional<String> getVersionedComponentId() { + return Optional.ofNullable(versionedComponentId.get()); + } + + @Override + public void setVersionedComponentId(final String versionedComponentId) { + boolean updated = false; + while (!updated) { + final String currentId = this.versionedComponentId.get(); + + if (currentId == null) { + updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); + } else if (currentId.equals(versionedComponentId)) { + return; + } else if (versionedComponentId == null) { + updated = this.versionedComponentId.compareAndSet(currentId, null); + } else { + throw new IllegalStateException(this + " is already under version control"); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java index 7dbcec1..7d657ce 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java @@ -22,6 +22,7 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr private String id; private String targetId; + private String versionedComponentId; private String groupId; private String name; private String comments; @@ -185,4 +186,13 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr } return name.equals(other.getName()); } + + @Override + public String getVersionedComponentId() { + return versionedComponentId; + } + + public void setVersionedComponentId(String versionedId) { + this.versionedComponentId = versionedId; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java index 7ed9187..c7a7e7d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java @@ -26,6 +26,7 @@ import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.util.NiFiProperties; import org.springframework.beans.BeansException; @@ -49,6 +50,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex private ClusterCoordinator clusterCoordinator; private VariableRegistry variableRegistry; private LeaderElectionManager leaderElectionManager; + private FlowRegistryClient flowRegistryClient; @Override public Object getObject() throws Exception { @@ -69,7 +71,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex clusterCoordinator, heartbeatMonitor, leaderElectionManager, - variableRegistry); + variableRegistry, + flowRegistryClient); } else { flowController = FlowController.createStandaloneInstance( flowFileEventRepository, @@ -77,7 +80,9 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex authorizer, auditService, encryptor, - bulletinRepository, variableRegistry); + bulletinRepository, + variableRegistry, + flowRegistryClient); } } @@ -133,4 +138,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) { this.leaderElectionManager = leaderElectionManager; } + + public void setFlowRegistryClient(final FlowRegistryClient flowRegistryClient) { + this.flowRegistryClient = flowRegistryClient; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 19e1c55..8186c8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -45,6 +45,7 @@ of unexpected process termination or execution failure. The "id" should rarely change and if it must then the database directory should be deleted to be safe.--> <xs:element name="id" type="NonEmptyStringType"/> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <!-- The "name" is a nicely displayable description of the processor's duty--> <xs:element name="name" type="NonEmptyStringType"/> @@ -130,10 +131,12 @@ <xs:complexType name="ProcessGroupType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <xs:element name="name" type="NonEmptyStringType" /> <xs:element name="position" type="PositionType" /> <xs:element name="comment" type="xs:string" /> - + <xs:element name="versionControlInformation" type="VersionControlInformation" minOccurs="0" maxOccurs="1" /> + <!-- Each "processor" defines the actual dataflow work horses that make dataflow happen--> <xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/> @@ -156,11 +159,21 @@ <xs:attribute name="value" /> </xs:complexType> + <xs:complexType name="VersionControlInformation"> + <xs:sequence> + <xs:element name="registryId" type="NonEmptyStringType" /> + <xs:element name="bucketId" type="NonEmptyStringType" /> + <xs:element name="flowId" type="NonEmptyStringType" /> + <xs:element name="version" type="NonEmptyStringType" /> + </xs:sequence> + </xs:complexType> + <!-- Same as ProcessGroupType except that instead of input ports & output ports being of type PortType, they are of type RootGroupPortType --> <xs:complexType name="RootProcessGroupType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <xs:element name="name" type="NonEmptyStringType" /> <xs:element name="position" type="PositionType" /> <xs:element name="comment" type="xs:string" /> @@ -184,6 +197,7 @@ <xs:complexType name="FunnelType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <xs:element name="position" type="PositionType" /> </xs:sequence> </xs:complexType> @@ -191,6 +205,7 @@ <xs:complexType name="LabelType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <xs:element name="position" type="PositionType" /> <xs:element name="size" type="SizeType" /> <xs:element name="styles" type="Styles" /> @@ -201,6 +216,7 @@ <xs:complexType name="RemoteProcessGroupType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <xs:element name="name" type="xs:string" /> <xs:element name="position" type="PositionType" /> <xs:element name="comment" type="xs:string" /> @@ -227,6 +243,7 @@ <xs:complexType name="ConnectionType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <xs:element name="name" type="xs:string" /> <xs:element name="bendPoints" type="BendPointsType" minOccurs="0" maxOccurs="1" /> <xs:element name="labelIndex" type="xs:int" minOccurs="0" maxOccurs="1" /> @@ -262,6 +279,7 @@ <xs:complexType name="PortType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <xs:element name="name" type="NonEmptyStringType" /> <xs:element name="position" type="PositionType" /> <xs:element name="comments" type="xs:string" /> @@ -384,6 +402,7 @@ <xs:complexType name="ControllerServiceType"> <xs:sequence> <xs:element name="id" type="NonEmptyStringType" /> + <xs:element name="versionedComponentId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" /> <xs:element name="name" type="NonEmptyStringType" /> <xs:element name="comment" type="xs:string" /> <xs:element name="class" type="NonEmptyStringType" /> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index b3c426c..6a3ec8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -35,6 +35,11 @@ <constructor-arg ref="nifiProperties" /> </bean> + <!-- flow registry --> + <bean id="flowRegistryClient" class="org.apache.nifi.registry.flow.FileBasedFlowRegistryClient"> + <constructor-arg index="0" type="java.io.File" value="../flowRegistry" /> + </bean> + <!-- flow controller --> <bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean"> <property name="properties" ref="nifiProperties"/> @@ -45,6 +50,7 @@ <property name="clusterCoordinator" ref="clusterCoordinator" /> <property name="variableRegistry" ref="variableRegistry"/> <property name="leaderElectionManager" ref="leaderElectionManager" /> + <property name="flowRegistryClient" ref="flowRegistryClient" /> </bean> <!-- flow service --> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index 6973d12..392e92b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -28,6 +28,7 @@ import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ConnectableDTO; @@ -86,7 +87,7 @@ public class StandardFlowServiceTest { mockAuditService = mock(AuditService.class); revisionManager = mock(RevisionManager.class); flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, authorizer, mockAuditService, mockEncryptor, - new VolatileBulletinRepository(), variableRegistry); + new VolatileBulletinRepository(), variableRegistry, mock(FlowRegistryClient.class)); flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager, authorizer); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 596c00f..d9728b2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -49,6 +49,7 @@ import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.variable.FileBasedVariableRegistry; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -162,7 +163,8 @@ public class TestFlowController { variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()); bulletinRepo = Mockito.mock(BulletinRepository.class); - controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry); + controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, + auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class)); } @After @@ -323,7 +325,8 @@ public class TestFlowController { assertNotEquals(authFingerprint, authorizer.getFingerprint()); controller.shutdown(true); - controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry); + controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, + auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class)); controller.synchronize(standardFlowSynchronizer, proposedDataFlow); assertEquals(authFingerprint, authorizer.getFingerprint()); }
