exceptionfactory commented on code in PR #10471:
URL: https://github.com/apache/nifi/pull/10471#discussion_r2560387659
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java:
##########
@@ -209,6 +210,22 @@ public RegisteredFlow getFlowForUser(final
FlowRegistryClientUserContext context
}
}
+ @Override
+ public void createBranchForUser(final FlowRegistryClientUserContext
context, final String registryId, final FlowVersionLocation sourceLocation,
final String newBranchName) {
+ final FlowRegistryClientNode flowRegistry =
flowController.getFlowManager().getFlowRegistryClient(registryId);
+ if (flowRegistry == null) {
+ throw new IllegalArgumentException("The specified registry id is
unknown to this NiFi.");
+ }
+
+ try {
+ flowRegistry.createBranch(context, sourceLocation, newBranchName);
+ } catch (final UnsupportedOperationException e) {
+ throw e;
+ } catch (final IOException | FlowRegistryException ioe) {
+ throw new NiFiCoreException("Unable to create branch [" +
newBranchName + "] in registry with ID " + registryId + ": " +
ioe.getMessage(), ioe);
Review Comment:
Recommend avoiding the repeated message since it is part of the stack trace:
```suggestion
throw new NiFiCoreException("Unable to create branch [" +
newBranchName + "] in registry with ID " + registryId, ioe);
```
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java:
##########
@@ -5720,6 +5722,143 @@ public VersionControlInformationEntity
setVersionControlInformation(final Revisi
return
entityFactory.createVersionControlInformationEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()));
}
+ @Override
+ public VersionControlInformationEntity createFlowBranch(final Revision
revision, final String processGroupId, final String newBranchName,
+ final String
sourceBranch, final String sourceVersion) {
+ final ProcessGroup group =
processGroupDAO.getProcessGroup(processGroupId);
+ final VersionControlInformation versionControlInformation =
group.getVersionControlInformation();
+
+ if (versionControlInformation == null) {
+ throw new IllegalStateException("Process Group with ID " +
processGroupId + " is not currently under Version Control");
+ }
+
+ final String trimmedBranchName =
org.apache.commons.lang3.StringUtils.trimToNull(newBranchName);
+ if (trimmedBranchName == null) {
+ throw new IllegalArgumentException("Branch name must be
specified");
+ }
+ if (trimmedBranchName.equals(versionControlInformation.getBranch())) {
+ throw new IllegalArgumentException("Process Group is already
tracking branch " + trimmedBranchName);
+ }
+
+ final String resolvedSourceBranch =
org.apache.commons.lang3.StringUtils.isNotBlank(sourceBranch) ? sourceBranch :
versionControlInformation.getBranch();
+ if
(org.apache.commons.lang3.StringUtils.isBlank(resolvedSourceBranch)) {
+ throw new IllegalArgumentException("Source branch must be
specified");
+ }
+
+ final String resolvedSourceVersion =
org.apache.commons.lang3.StringUtils.isNotBlank(sourceVersion) ? sourceVersion
: versionControlInformation.getVersion();
+
+ final FlowVersionLocation sourceLocation = new
FlowVersionLocation(resolvedSourceBranch,
+ versionControlInformation.getBucketIdentifier(),
+ versionControlInformation.getFlowIdentifier(),
+ resolvedSourceVersion);
+
+ final FlowRegistryClientUserContext clientUserContext =
FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser());
+
+ try {
+ flowRegistryDAO.createBranchForUser(clientUserContext,
versionControlInformation.getRegistryIdentifier(), sourceLocation,
trimmedBranchName);
+ } catch (final UnsupportedOperationException e) {
+ throw new IllegalArgumentException("Configured Flow Registry does
not support branch creation.", e);
Review Comment:
```suggestion
throw new IllegalArgumentException("Configured Flow Registry
does not support branch creation", e);
```
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java:
##########
@@ -209,6 +210,22 @@ public RegisteredFlow getFlowForUser(final
FlowRegistryClientUserContext context
}
}
+ @Override
+ public void createBranchForUser(final FlowRegistryClientUserContext
context, final String registryId, final FlowVersionLocation sourceLocation,
final String newBranchName) {
+ final FlowRegistryClientNode flowRegistry =
flowController.getFlowManager().getFlowRegistryClient(registryId);
+ if (flowRegistry == null) {
+ throw new IllegalArgumentException("The specified registry id is
unknown to this NiFi.");
Review Comment:
Although this wording is used elsewhere, I recommend removing the `to this
NiFi` wording since it is implied
```suggestion
throw new IllegalArgumentException("Registry ID [%s] not
found".formatted(registryId));
```
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java:
##########
@@ -81,4 +109,240 @@ public void testExportFlowVersion() {
verify(innerInnerVersionedProcessGroup).setVersionedFlowCoordinates(null);
}
-}
\ No newline at end of file
+
+
+ @Test
+ public void testCreateFlowBranchRequiresBranchName() {
+ final String groupId = UUID.randomUUID().toString();
+ final CreateFlowBranchRequestEntity requestEntity = new
CreateFlowBranchRequestEntity();
+ final RevisionDTO revisionDTO = new RevisionDTO();
+ revisionDTO.setClientId("client-id");
+ revisionDTO.setVersion(0L);
+ requestEntity.setProcessGroupRevision(revisionDTO);
+
+ assertThrows(IllegalArgumentException.class, () ->
versionsResource.createFlowBranch(groupId, requestEntity));
+ }
+
+ @Test
+ public void testCreateFlowBranchInvokesService() {
+ final String groupId = UUID.randomUUID().toString();
+ final CreateFlowBranchRequestEntity requestEntity = new
CreateFlowBranchRequestEntity();
+ final RevisionDTO revisionDTO = new RevisionDTO();
+ revisionDTO.setClientId("client-id");
+ revisionDTO.setVersion(1L);
+ requestEntity.setProcessGroupRevision(revisionDTO);
+ requestEntity.setBranch("feature");
+
+ versionsResource.httpServletRequest = new MockHttpServletRequest();
+
+ final VersionControlInformationDTO currentDto = new
VersionControlInformationDTO();
+ currentDto.setGroupId(groupId);
+ currentDto.setBranch("main");
+ currentDto.setVersion("3");
+ currentDto.setState(VersionControlInformationDTO.UP_TO_DATE);
+ final VersionControlInformationEntity currentEntity = new
VersionControlInformationEntity();
+ currentEntity.setVersionControlInformation(currentDto);
+
when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity);
+
+ final VersionControlInformationEntity expectedEntity = new
VersionControlInformationEntity();
+ when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId),
eq("feature"), any(), any()))
+ .thenReturn(expectedEntity);
+
+ final Response response = versionsResource.createFlowBranch(groupId,
requestEntity);
+ assertEquals(200, response.getStatus());
+ assertEquals(expectedEntity, response.getEntity());
+
+ ArgumentCaptor<Revision> revisionCaptor =
ArgumentCaptor.forClass(Revision.class);
+ verify(serviceFacade).createFlowBranch(revisionCaptor.capture(),
eq(groupId), eq("feature"), isNull(), isNull());
+
+ final Revision capturedRevision = revisionCaptor.getValue();
+ assertEquals(1L, capturedRevision.getVersion());
+ assertEquals("client-id", capturedRevision.getClientId());
+ assertEquals(groupId, capturedRevision.getComponentId());
+ }
+
+ @Test
+ public void testCreateFlowBranchFailsWhenBranchExists() {
+ final String groupId = UUID.randomUUID().toString();
+ final CreateFlowBranchRequestEntity requestEntity = new
CreateFlowBranchRequestEntity();
+ final RevisionDTO revisionDTO = new RevisionDTO();
+ revisionDTO.setClientId("client-id");
+ revisionDTO.setVersion(1L);
+ requestEntity.setProcessGroupRevision(revisionDTO);
+ requestEntity.setBranch("main");
+
+ versionsResource.httpServletRequest = new MockHttpServletRequest();
+
+ final VersionControlInformationDTO currentDto = new
VersionControlInformationDTO();
+ currentDto.setGroupId(groupId);
+ currentDto.setBranch("main");
+ currentDto.setState(VersionControlInformationDTO.UP_TO_DATE);
+ final VersionControlInformationEntity currentEntity = new
VersionControlInformationEntity();
+ currentEntity.setVersionControlInformation(currentDto);
+
when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity);
+
+ when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId),
eq("main"), any(), any()))
+ .thenThrow(new IllegalArgumentException("Process Group is
already tracking branch main"));
+
+ assertThrows(IllegalArgumentException.class, () ->
versionsResource.createFlowBranch(groupId, requestEntity));
+ }
+
+ @Test
+ public void testCreateFlowBranchUnsupported() {
+ final String groupId = UUID.randomUUID().toString();
+ final CreateFlowBranchRequestEntity requestEntity = new
CreateFlowBranchRequestEntity();
+ final RevisionDTO revisionDTO = new RevisionDTO();
+ revisionDTO.setClientId("client-id");
+ revisionDTO.setVersion(1L);
+ requestEntity.setProcessGroupRevision(revisionDTO);
+ requestEntity.setBranch("feature");
+
+ versionsResource.httpServletRequest = new MockHttpServletRequest();
+
+ final VersionControlInformationDTO currentDto = new
VersionControlInformationDTO();
+ currentDto.setGroupId(groupId);
+ currentDto.setBranch("main");
+ currentDto.setVersion("2");
+ currentDto.setState(VersionControlInformationDTO.UP_TO_DATE);
+ final VersionControlInformationEntity currentEntity = new
VersionControlInformationEntity();
+ currentEntity.setVersionControlInformation(currentDto);
+
when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity);
+
+ when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId),
eq("feature"), any(), any()))
+ .thenThrow(new IllegalArgumentException("Registry does not
support branching"));
+
+ assertThrows(IllegalArgumentException.class, () ->
versionsResource.createFlowBranch(groupId, requestEntity));
+ }
+
+ @Test
+ public void testCreateFlowBranchAllowedWhenLocallyModified() {
+ final String groupId = UUID.randomUUID().toString();
+ final CreateFlowBranchRequestEntity requestEntity = new
CreateFlowBranchRequestEntity();
+ final RevisionDTO revisionDTO = new RevisionDTO();
+ revisionDTO.setClientId("client-id");
+ revisionDTO.setVersion(1L);
+ requestEntity.setProcessGroupRevision(revisionDTO);
+ requestEntity.setBranch("feature");
+
+ versionsResource.httpServletRequest = new MockHttpServletRequest();
+
+ final VersionControlInformationDTO currentDto = new
VersionControlInformationDTO();
+ currentDto.setGroupId(groupId);
+ currentDto.setBranch("main");
+ currentDto.setVersion("1");
+ currentDto.setState(VersionControlInformationDTO.LOCALLY_MODIFIED);
+ final VersionControlInformationEntity currentEntity = new
VersionControlInformationEntity();
+ currentEntity.setVersionControlInformation(currentDto);
+
when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity);
+
+ final VersionControlInformationEntity expectedEntity = new
VersionControlInformationEntity();
+ when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId),
eq("feature"), any(), any()))
+ .thenReturn(expectedEntity);
+
+ final Response response = versionsResource.createFlowBranch(groupId,
requestEntity);
+ assertEquals(200, response.getStatus());
Review Comment:
HttpURLConnection.HTTP_OK can be used in place for `200` here and in other
methods.
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java:
##########
@@ -5720,6 +5722,143 @@ public VersionControlInformationEntity
setVersionControlInformation(final Revisi
return
entityFactory.createVersionControlInformationEntity(snapshot.getComponent(),
dtoFactory.createRevisionDTO(snapshot.getLastModification()));
}
+ @Override
+ public VersionControlInformationEntity createFlowBranch(final Revision
revision, final String processGroupId, final String newBranchName,
+ final String
sourceBranch, final String sourceVersion) {
+ final ProcessGroup group =
processGroupDAO.getProcessGroup(processGroupId);
+ final VersionControlInformation versionControlInformation =
group.getVersionControlInformation();
+
+ if (versionControlInformation == null) {
+ throw new IllegalStateException("Process Group with ID " +
processGroupId + " is not currently under Version Control");
+ }
+
+ final String trimmedBranchName =
org.apache.commons.lang3.StringUtils.trimToNull(newBranchName);
+ if (trimmedBranchName == null) {
+ throw new IllegalArgumentException("Branch name must be
specified");
+ }
+ if (trimmedBranchName.equals(versionControlInformation.getBranch())) {
+ throw new IllegalArgumentException("Process Group is already
tracking branch " + trimmedBranchName);
+ }
+
+ final String resolvedSourceBranch =
org.apache.commons.lang3.StringUtils.isNotBlank(sourceBranch) ? sourceBranch :
versionControlInformation.getBranch();
+ if
(org.apache.commons.lang3.StringUtils.isBlank(resolvedSourceBranch)) {
+ throw new IllegalArgumentException("Source branch must be
specified");
+ }
+
+ final String resolvedSourceVersion =
org.apache.commons.lang3.StringUtils.isNotBlank(sourceVersion) ? sourceVersion
: versionControlInformation.getVersion();
+
+ final FlowVersionLocation sourceLocation = new
FlowVersionLocation(resolvedSourceBranch,
+ versionControlInformation.getBucketIdentifier(),
+ versionControlInformation.getFlowIdentifier(),
+ resolvedSourceVersion);
+
+ final FlowRegistryClientUserContext clientUserContext =
FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser());
+
+ try {
+ flowRegistryDAO.createBranchForUser(clientUserContext,
versionControlInformation.getRegistryIdentifier(), sourceLocation,
trimmedBranchName);
+ } catch (final UnsupportedOperationException e) {
+ throw new IllegalArgumentException("Configured Flow Registry does
not support branch creation.", e);
+ }
+
+ final VersionControlInformationDTO updatedVersionControlInformation =
new VersionControlInformationDTO();
+ updatedVersionControlInformation.setGroupId(processGroupId);
+
updatedVersionControlInformation.setRegistryId(versionControlInformation.getRegistryIdentifier());
+
updatedVersionControlInformation.setRegistryName(versionControlInformation.getRegistryName());
+
updatedVersionControlInformation.setBucketId(versionControlInformation.getBucketIdentifier());
+
updatedVersionControlInformation.setBucketName(versionControlInformation.getBucketName());
+
updatedVersionControlInformation.setFlowId(versionControlInformation.getFlowIdentifier());
+
updatedVersionControlInformation.setFlowName(versionControlInformation.getFlowName());
+
updatedVersionControlInformation.setFlowDescription(versionControlInformation.getFlowDescription());
+
updatedVersionControlInformation.setStorageLocation(versionControlInformation.getStorageLocation());
+ updatedVersionControlInformation.setBranch(trimmedBranchName);
+ updatedVersionControlInformation.setVersion(resolvedSourceVersion);
+
+ final VersionedFlowStatus status =
versionControlInformation.getStatus();
+ VersionedFlowState updatedState = null;
+ String stateExplanation = status == null ? null :
status.getStateExplanation();
+ if (status != null) {
+ final VersionedFlowState state = status.getState();
+ if (state != null) {
+ switch (state) {
+ case LOCALLY_MODIFIED_AND_STALE:
+ updatedState = VersionedFlowState.LOCALLY_MODIFIED;
+ stateExplanation = "Process Group has local
modifications";
+ break;
+ case STALE:
+ updatedState = VersionedFlowState.UP_TO_DATE;
+ break;
+ default:
+ updatedState = state;
+ break;
+ }
+ }
+ }
+
+ if (updatedState != null) {
+ updatedVersionControlInformation.setState(updatedState.name());
+
updatedVersionControlInformation.setStateExplanation(stateExplanation);
+ }
+
+ final FlowManager flowManager = controllerFacade.getFlowManager();
+
+ VersionedProcessGroup registrySnapshot = null;
+ if (flowManager == null) {
+ logger.warn("Failed to synchronize Process Group {} with Flow
Registry after creating branch {} because Flow Manager is unavailable",
group.getIdentifier(), trimmedBranchName);
+ } else {
+ try {
+ final FlowRegistryClientNode registryClient =
flowManager.getFlowRegistryClient(versionControlInformation.getRegistryIdentifier());
+ if (registryClient == null) {
+ logger.warn("Unable to retrieve Flow Registry client with
identifier {} for Process Group {}",
versionControlInformation.getRegistryIdentifier(), group.getIdentifier());
+ } else {
+ final FlowVersionLocation branchLocation = new
FlowVersionLocation(trimmedBranchName,
+ versionControlInformation.getBucketIdentifier(),
+ versionControlInformation.getFlowIdentifier(),
+ resolvedSourceVersion);
+
+ final FlowSnapshotContainer snapshotContainer =
registryClient.getFlowContents(clientUserContext, branchLocation, false);
+ final RegisteredFlowSnapshot flowSnapshot =
snapshotContainer == null ? null : snapshotContainer.getFlowSnapshot();
+ if (flowSnapshot != null) {
+ registrySnapshot = flowSnapshot.getFlowContents();
+ }
+ }
+ } catch (final IOException | FlowRegistryException e) {
+ logger.warn("Failed to retrieve Flow Registry snapshot for
Process Group {} on branch {} due to {}", group.getIdentifier(),
trimmedBranchName, e.getMessage());
+ logger.debug("Failed to retrieve Flow Registry snapshot for
Process Group {}", group.getIdentifier(), e);
Review Comment:
Recommend collapsing these logs:
```suggestion
logger.warn("Failed to retrieve Flow Registry snapshot for
Process Group {}", group.getIdentifier(), e);
```
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeCreateFlowBranchTest.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserDetails;
+import org.apache.nifi.authorization.user.StandardNiFiUser;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.FlowRegistryClientNode;
+import org.apache.nifi.registry.flow.FlowRegistryClientUserContext;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.FlowSnapshotContainer;
+import org.apache.nifi.registry.flow.FlowVersionLocation;
+import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedFlowStatus;
+import org.apache.nifi.web.api.dto.DtoFactory;
+import org.apache.nifi.web.api.dto.EntityFactory;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.controller.ControllerFacade;
+import org.apache.nifi.web.dao.FlowRegistryDAO;
+import org.apache.nifi.web.dao.ProcessGroupDAO;
+import org.apache.nifi.web.revision.RevisionClaim;
+import org.apache.nifi.web.revision.RevisionManager;
+import org.apache.nifi.web.revision.UpdateRevisionTask;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class StandardNiFiServiceFacadeCreateFlowBranchTest {
+
+ private StandardNiFiServiceFacade serviceFacade;
+
+ @Mock
+ private ProcessGroupDAO processGroupDAO;
+
+ @Mock
+ private FlowRegistryDAO flowRegistryDAO;
+
+ @Mock
+ private DtoFactory dtoFactory;
+
+ @Mock
+ private EntityFactory entityFactory;
+
+ @Mock
+ private ControllerFacade controllerFacade;
+
+ @Mock
+ private RevisionManager revisionManager;
+
+ @Mock
+ private FlowManager flowManager;
+
+ private static final String PROCESS_GROUP_ID = "pg-1";
+
+ @BeforeEach
+ void setUp() {
+ serviceFacade = new StandardNiFiServiceFacade();
+ serviceFacade.setProcessGroupDAO(processGroupDAO);
+ serviceFacade.setFlowRegistryDAO(flowRegistryDAO);
+ serviceFacade.setDtoFactory(dtoFactory);
+ serviceFacade.setEntityFactory(entityFactory);
+ serviceFacade.setControllerFacade(controllerFacade);
+ serviceFacade.setRevisionManager(revisionManager);
+
+
lenient().when(controllerFacade.getFlowManager()).thenReturn(flowManager);
+
lenient().when(flowManager.getFlowRegistryClient(anyString())).thenReturn(null);
+
+
lenient().when(revisionManager.updateRevision(any(RevisionClaim.class),
any(NiFiUser.class), any(UpdateRevisionTask.class)))
+ .thenAnswer(invocation -> {
+ final UpdateRevisionTask<?> task =
invocation.getArgument(2);
+ return task.update();
+ });
+
lenient().when(revisionManager.getRevision(anyString())).thenAnswer(invocation
-> {
+ final String componentId = invocation.getArgument(0, String.class);
+ return new Revision(1L, "client-1", componentId);
+ });
+
+ final NiFiUser user = new
StandardNiFiUser.Builder().identity("unit-test").build();
+ final TestingAuthenticationToken authenticationToken = new
TestingAuthenticationToken(new NiFiUserDetails(user), null);
+
SecurityContextHolder.getContext().setAuthentication(authenticationToken);
+ }
+
+ @AfterEach
+ void tearDown() {
+ SecurityContextHolder.clearContext();
+ }
+
+ @Test
+ void testCreateFlowBranchSuccess() throws IOException,
FlowRegistryException {
+ final Revision revision = new Revision(1L, "client-1",
PROCESS_GROUP_ID);
+
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup);
+
+ final VersionControlInformation versionControlInformation =
mock(VersionControlInformation.class);
+
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
+
when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1");
+ when(versionControlInformation.getBranch()).thenReturn("main");
+
when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1");
+
when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1");
+
when(versionControlInformation.getFlowDescription()).thenReturn("desc");
+ when(versionControlInformation.getFlowName()).thenReturn("name");
+ when(versionControlInformation.getStorageLocation()).thenReturn("loc");
+ when(versionControlInformation.getVersion()).thenReturn("1");
+
+ final VersionedFlowStatus flowStatus = mock(VersionedFlowStatus.class);
+ when(versionControlInformation.getStatus()).thenReturn(flowStatus);
+
when(flowStatus.getState()).thenReturn(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
+ when(flowStatus.getStateExplanation()).thenReturn("Up to date");
+
+ final ProcessGroup updatedGroup = processGroup;
+
when(processGroupDAO.updateVersionControlInformation(any(VersionControlInformationDTO.class),
eq(Collections.emptyMap())))
+ .thenReturn(updatedGroup);
+
+ final VersionControlInformationDTO updatedDto = new
VersionControlInformationDTO();
+ updatedDto.setBranch("feature");
+ updatedDto.setRegistryId("registry-1");
+
+ final VersionControlInformationDTO refreshedDto = new
VersionControlInformationDTO();
+ refreshedDto.setBranch("feature");
+ refreshedDto.setRegistryId("registry-1");
+ refreshedDto.setState(VersionControlInformationDTO.LOCALLY_MODIFIED);
+ refreshedDto.setStateExplanation("Process Group has local
modifications");
+
+
when(dtoFactory.createVersionControlInformationDto(updatedGroup)).thenReturn(updatedDto,
refreshedDto);
+
+ final VersionControlInformationEntity resultEntity = new
VersionControlInformationEntity();
+ resultEntity.setVersionControlInformation(refreshedDto);
+
when(entityFactory.createVersionControlInformationEntity(eq(refreshedDto),
any(RevisionDTO.class))).thenReturn(resultEntity);
+
+
when(dtoFactory.createRevisionDTO(any(FlowModification.class))).thenReturn(new
RevisionDTO());
+
+ final FlowRegistryClientNode registryClient =
mock(FlowRegistryClientNode.class);
+
when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(registryClient);
+ final VersionedProcessGroup registrySnapshot = new
VersionedProcessGroup();
+ final RegisteredFlowSnapshot registeredFlowSnapshot = new
RegisteredFlowSnapshot();
+ registeredFlowSnapshot.setFlowContents(registrySnapshot);
+ final FlowSnapshotContainer snapshotContainer = new
FlowSnapshotContainer(registeredFlowSnapshot);
+ when(registryClient.getFlowContents(any(),
any(FlowVersionLocation.class), eq(false))).thenReturn(snapshotContainer);
+
+ final VersionControlInformationEntity response =
serviceFacade.createFlowBranch(revision, PROCESS_GROUP_ID, " feature ", null,
null);
+ assertEquals(resultEntity, response);
+
+ ArgumentCaptor<FlowVersionLocation> locationCaptor =
ArgumentCaptor.forClass(FlowVersionLocation.class);
+
verify(flowRegistryDAO).createBranchForUser(any(FlowRegistryClientUserContext.class),
eq("registry-1"),
+ locationCaptor.capture(), eq("feature"));
+
+ final FlowVersionLocation capturedLocation = locationCaptor.getValue();
+ assertEquals("main", capturedLocation.getBranch());
+ assertEquals("bucket-1", capturedLocation.getBucketId());
+ assertEquals("flow-1", capturedLocation.getFlowId());
+ assertEquals("1", capturedLocation.getVersion());
+
+ final ArgumentCaptor<FlowModification> modificationCaptor =
ArgumentCaptor.forClass(FlowModification.class);
+ verify(dtoFactory).createRevisionDTO(modificationCaptor.capture());
+ assertEquals("unit-test",
modificationCaptor.getValue().getLastModifier());
+
+ verify(registryClient).getFlowContents(any(),
any(FlowVersionLocation.class), eq(false));
+ verify(processGroup).setVersionControlInformation(argThat(vci -> vci
instanceof StandardVersionControlInformation
+ && ((StandardVersionControlInformation) vci).getFlowSnapshot()
== registrySnapshot), eq(Collections.emptyMap()));
+ verify(processGroup).synchronizeWithFlowRegistry(flowManager);
+
verify(entityFactory).createVersionControlInformationEntity(eq(refreshedDto),
any(RevisionDTO.class));
+ assertEquals(refreshedDto,
resultEntity.getVersionControlInformation());
+ }
+
+ @Test
+ void testCreateFlowBranchSameBranchRejected() {
+ final Revision revision = new Revision(1L, "client-1",
PROCESS_GROUP_ID);
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup);
+
+ final VersionControlInformation versionControlInformation =
mock(VersionControlInformation.class);
+
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
+ when(versionControlInformation.getBranch()).thenReturn("main");
+
+ assertThrows(IllegalArgumentException.class,
+ () -> serviceFacade.createFlowBranch(revision,
PROCESS_GROUP_ID, "main", null, null));
+
+ verify(flowRegistryDAO, never()).createBranchForUser(any(), any(),
any(), any());
+ verify(processGroup,
never()).synchronizeWithFlowRegistry(any(FlowManager.class));
+ }
+
+ @Test
+ void testCreateFlowBranchUnsupportedRegistry() throws IOException,
FlowRegistryException {
+ final Revision revision = new Revision(1L, "client-1",
PROCESS_GROUP_ID);
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup);
+
+ final VersionControlInformation versionControlInformation =
mock(VersionControlInformation.class);
+
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
+
when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1");
+ when(versionControlInformation.getBranch()).thenReturn("main");
+
when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1");
+
when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1");
+ when(versionControlInformation.getVersion()).thenReturn("1");
+
+ doThrow(new UnsupportedOperationException("not supported"))
+ .when(flowRegistryDAO)
+ .createBranchForUser(any(FlowRegistryClientUserContext.class),
eq("registry-1"), any(FlowVersionLocation.class), eq("feature"));
+
+ assertThrows(IllegalArgumentException.class,
+ () -> serviceFacade.createFlowBranch(revision,
PROCESS_GROUP_ID, "feature", null, null));
+
+ verify(processGroup,
never()).synchronizeWithFlowRegistry(any(FlowManager.class));
+ }
+
+ @Test
+ void testCreateFlowBranchNotVersionControlled() {
+ final Revision revision = new Revision(1L, "client-1",
PROCESS_GROUP_ID);
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup);
+ when(processGroup.getVersionControlInformation()).thenReturn(null);
+
+ assertThrows(IllegalStateException.class,
+ () -> serviceFacade.createFlowBranch(revision,
PROCESS_GROUP_ID, "feature", null, null));
+
+ verify(flowRegistryDAO, never()).createBranchForUser(any(), any(),
any(), any());
+ verify(processGroup,
never()).synchronizeWithFlowRegistry(any(FlowManager.class));
+ }
+
+ @Test
+ void testCreateFlowBranchPropagatesRegistryErrors() throws IOException,
FlowRegistryException {
+ final Revision revision = new Revision(1L, "client-1",
PROCESS_GROUP_ID);
+ final ProcessGroup processGroup = mock(ProcessGroup.class);
+
when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup);
+
+ final VersionControlInformation versionControlInformation =
mock(VersionControlInformation.class);
+
when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation);
+
when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1");
+ when(versionControlInformation.getBranch()).thenReturn("main");
+
when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1");
+
when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1");
+ when(versionControlInformation.getVersion()).thenReturn("1");
+
+ final FlowRegistryException cause = new FlowRegistryException("Branch
[feature] already exists");
+ doThrow(new NiFiCoreException("Unable to create branch [feature] in
registry with ID registry-1: Branch [feature] already exists", cause))
+ .when(flowRegistryDAO)
+ .createBranchForUser(any(FlowRegistryClientUserContext.class),
eq("registry-1"), any(FlowVersionLocation.class), eq("feature"));
+
+ final NiFiCoreException exception =
assertThrows(NiFiCoreException.class,
+ () -> serviceFacade.createFlowBranch(revision,
PROCESS_GROUP_ID, "feature", null, null));
+
+ assertEquals("Unable to create branch [feature] in registry with ID
registry-1: Branch [feature] already exists", exception.getMessage());
Review Comment:
See note on message matching.
##########
nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAOTest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.dao.impl;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.registry.flow.FlowRegistryClientNode;
+import org.apache.nifi.registry.flow.FlowRegistryClientUserContext;
+import org.apache.nifi.registry.flow.FlowRegistryException;
+import org.apache.nifi.registry.flow.FlowVersionLocation;
+import org.apache.nifi.web.NiFiCoreException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class StandardFlowRegistryDAOTest {
+
+ @Mock
+ private FlowController flowController;
+
+ @Mock
+ private FlowManager flowManager;
+
+ @Mock
+ private FlowRegistryClientUserContext userContext;
+
+ private StandardFlowRegistryDAO dao;
+
+ @BeforeEach
+ void setUp() {
+ dao = new StandardFlowRegistryDAO();
+ dao.setFlowController(flowController);
+
+ when(flowController.getFlowManager()).thenReturn(flowManager);
+ }
+
+ @Test
+ void testCreateBranchDelegatesToRegistryClient() throws IOException,
FlowRegistryException {
+ final FlowRegistryClientNode clientNode =
mock(FlowRegistryClientNode.class);
+
when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode);
+
+ final FlowVersionLocation sourceLocation = new
FlowVersionLocation("main", "bucket", "flow", "1");
+
+ dao.createBranchForUser(userContext, "registry-1", sourceLocation,
"feature");
+
+ ArgumentCaptor<FlowVersionLocation> locationCaptor =
ArgumentCaptor.forClass(FlowVersionLocation.class);
+ verify(clientNode).createBranch(eq(userContext),
locationCaptor.capture(), eq("feature"));
+
+ final FlowVersionLocation capturedLocation = locationCaptor.getValue();
+ assertEquals("main", capturedLocation.getBranch());
+ assertEquals("bucket", capturedLocation.getBucketId());
+ assertEquals("flow", capturedLocation.getFlowId());
+ assertEquals("1", capturedLocation.getVersion());
+ }
+
+ @Test
+ void testCreateBranchUnsupported() throws IOException,
FlowRegistryException {
+ final FlowRegistryClientNode clientNode =
mock(FlowRegistryClientNode.class);
+
when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode);
+ final FlowVersionLocation sourceLocation = new
FlowVersionLocation("main", "bucket", "flow", "1");
+
+ doThrow(new UnsupportedOperationException("not supported"))
+ .when(clientNode)
+ .createBranch(userContext, sourceLocation, "feature");
+
+ assertThrows(UnsupportedOperationException.class,
+ () -> dao.createBranchForUser(userContext, "registry-1",
sourceLocation, "feature"));
+ }
+
+ @Test
+ void testCreateBranchUnknownRegistry() {
+ when(flowManager.getFlowRegistryClient("missing")).thenReturn(null);
+
+ final FlowVersionLocation sourceLocation = new
FlowVersionLocation("main", "bucket", "flow", "1");
+ assertThrows(IllegalArgumentException.class,
+ () -> dao.createBranchForUser(userContext, "missing",
sourceLocation, "feature"));
+ }
+
+ @Test
+ void testCreateBranchFlowRegistryExceptionWrapped() throws IOException,
FlowRegistryException {
+ final FlowRegistryClientNode clientNode =
mock(FlowRegistryClientNode.class);
+
when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode);
+
+ final FlowVersionLocation sourceLocation = new
FlowVersionLocation("main", "bucket", "flow", "1");
+ final FlowRegistryException cause = new FlowRegistryException("Branch
[feature] already exists");
+ doThrow(cause)
+ .when(clientNode)
+ .createBranch(userContext, sourceLocation, "feature");
+
+ final NiFiCoreException exception =
assertThrows(NiFiCoreException.class,
+ () -> dao.createBranchForUser(userContext, "registry-1",
sourceLocation, "feature"));
+
+ assertEquals("Unable to create branch [feature] in registry with ID
registry-1: Branch [feature] already exists", exception.getMessage());
Review Comment:
In general it is best to avoid asserting an exact message, recommend
asserting that the message contains a particular keyword, such as `registry-1`
if that is the goal.
##########
pom.xml:
##########
@@ -116,7 +116,7 @@
<node.version>v22.19.0</node.version>
<!-- NiFi build -->
- <nifi-api.version>2.4.0</nifi-api.version>
+ <nifi-api.version>2.5.0</nifi-api.version>
Review Comment:
It looks like this change is no longer necessary after rebasing to the
latest main branch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]