This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f81bfc3934 NIFI-15475 Blocked concurrent commits overwriting changes
with git-based Registry Clients (#10778)
f81bfc3934 is described below
commit f81bfc393464d12e922685c7fe267b8f788fb03d
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Jan 22 19:27:09 2026 +0100
NIFI-15475 Blocked concurrent commits overwriting changes with git-based
Registry Clients (#10778)
Signed-ff-by: David Handermann <[email protected]>
---
.../bitbucket/BitbucketRepositoryClient.java | 15 ++--
.../azure/devops/AzureDevOpsRepositoryClient.java | 23 ++++--
.../flow/git/AbstractGitFlowRegistryClient.java | 32 ++++++++-
.../flow/git/client/GitCreateContentRequest.java | 13 ++++
.../flow/git/client/GitRepositoryClient.java | 17 +++++
.../git/AbstractGitFlowRegistryClientTest.java | 5 ++
.../apache/nifi/github/GitHubRepositoryClient.java | 28 ++++++++
.../nifi/github/GitHubFlowRegistryClientTest.java | 83 ++++++++++++++++++++++
.../apache/nifi/gitlab/GitLabRepositoryClient.java | 7 ++
9 files changed, 210 insertions(+), 13 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
index e699975e31..1779663445 100644
---
a/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
+++
b/nifi-extension-bundles/nifi-atlassian-bundle/nifi-atlassian-extensions/src/main/java/org/apache/nifi/atlassian/bitbucket/BitbucketRepositoryClient.java
@@ -72,6 +72,7 @@ public class BitbucketRepositoryClient implements
GitRepositoryClient {
private static final String FIELD_BRANCH = "branch";
private static final String FIELD_MESSAGE = "message";
private static final String FIELD_SOURCE_COMMIT_ID = "sourceCommitId";
+ private static final String FIELD_PARENTS = "parents";
private static final String FIELD_CONTENT = "content";
private static final String FIELD_FILES = "files";
private static final String FIELD_CHILDREN = "children";
@@ -431,6 +432,12 @@ public class BitbucketRepositoryClient implements
GitRepositoryClient {
multipartBuilder.addPart(FIELD_MESSAGE,
StandardHttpContentType.TEXT_PLAIN,
request.getMessage().getBytes(StandardCharsets.UTF_8));
multipartBuilder.addPart(FIELD_BRANCH,
StandardHttpContentType.TEXT_PLAIN, branch.getBytes(StandardCharsets.UTF_8));
+ // Add parents parameter for atomic commit - Bitbucket Cloud will
reject if the branch has moved
+ final String expectedCommitSha = request.getExpectedCommitSha();
+ if (expectedCommitSha != null && !expectedCommitSha.isBlank()) {
+ multipartBuilder.addPart(FIELD_PARENTS,
StandardHttpContentType.TEXT_PLAIN,
expectedCommitSha.getBytes(StandardCharsets.UTF_8));
+ }
+
final URI uri =
getRepositoryUriBuilder().addPathSegment("src").build();
final String errorMessage = "Error while committing content for
repository [%s] on branch %s at path %s"
.formatted(repoName, branch, resolvedPath);
@@ -460,10 +467,10 @@ public class BitbucketRepositoryClient implements
GitRepositoryClient {
multipartBuilder.addPart(FIELD_MESSAGE,
StandardHttpContentType.TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}
- final String existingContentSha = request.getExistingContentSha();
- final boolean existingContentProvided = existingContentSha != null &&
!existingContentSha.isBlank();
- if (existingContentProvided) {
- multipartBuilder.addPart(FIELD_SOURCE_COMMIT_ID,
StandardHttpContentType.TEXT_PLAIN,
existingContentSha.getBytes(StandardCharsets.UTF_8));
+ // Use expectedCommitSha for atomic commit - Bitbucket DC will reject
if the file has changed since this commit
+ final String expectedCommitSha = request.getExpectedCommitSha();
+ if (expectedCommitSha != null && !expectedCommitSha.isBlank()) {
+ multipartBuilder.addPart(FIELD_SOURCE_COMMIT_ID,
StandardHttpContentType.TEXT_PLAIN,
expectedCommitSha.getBytes(StandardCharsets.UTF_8));
}
final HttpUriBuilder uriBuilder =
getRepositoryUriBuilder().addPathSegment("browse");
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
index 9d76adf98c..88091ff978 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-registry-clients/src/main/java/org/apache/nifi/azure/devops/AzureDevOpsRepositoryClient.java
@@ -359,13 +359,22 @@ public class AzureDevOpsRepositoryClient implements
GitRepositoryClient {
final String branch = request.getBranch();
final String message = request.getMessage();
logger.debug("Creating content at path [{}] on branch [{}] in repo
[{}]", path, branch, repoName);
- // Get branch current commit id
- final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS)
- .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX + branch)
- .addQueryParameter(API, API_VERSION)
- .build();
- final JsonNode refResponse = executeGet(refUri);
- final String oldObjectId =
refResponse.get(JSON_FIELD_VALUE).get(0).get(JSON_FIELD_OBJECT_ID).asText();
+
+ // Use expectedCommitSha for atomic commit if provided, otherwise
fetch current branch HEAD
+ // Azure DevOps will reject the push if oldObjectId doesn't match the
current branch HEAD
+ final String oldObjectId;
+ final String expectedCommitSha = request.getExpectedCommitSha();
+ if (expectedCommitSha != null && !expectedCommitSha.isBlank()) {
+ oldObjectId = expectedCommitSha;
+ } else {
+ // Fall back to fetching current branch commit id
+ final URI refUri = getUriBuilder().addPathSegment(SEGMENT_REFS)
+ .addQueryParameter(PARAM_FILTER, FILTER_HEADS_PREFIX +
branch)
+ .addQueryParameter(API, API_VERSION)
+ .build();
+ final JsonNode refResponse = executeGet(refUri);
+ oldObjectId =
refResponse.get(JSON_FIELD_VALUE).get(0).get(JSON_FIELD_OBJECT_ID).asText();
+ }
final URI pushUri = getUriBuilder().addPathSegment(SEGMENT_PUSHES)
.addQueryParameter(API, API_VERSION)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java
index 96e8e87753..b866f9d604 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java
@@ -355,7 +355,34 @@ public abstract class AbstractGitFlowRegistryClient
extends AbstractFlowRegistry
final String branch = snapshotMetadata.getBranch();
final FlowLocation flowLocation = new
FlowLocation(snapshotMetadata.getBranch(),
snapshotMetadata.getBucketIdentifier(), snapshotMetadata.getFlowIdentifier());
final String filePath = getSnapshotFilePath(flowLocation);
- final String previousSha = repositoryClient.getContentSha(filePath,
branch).orElse(null);
+
+ // Capture the expected version before any modifications - this is the
commit SHA the user believes they are committing on top of
+ final String expectedVersion = snapshotMetadata.getVersion();
+
+ // Get the current version (latest commit SHA) from the repository
+ final List<GitCommit> commits = repositoryClient.getCommits(filePath,
branch);
+ final String currentVersion = commits.isEmpty() ? null :
commits.getFirst().id();
+
+ // Check for version conflict: if the user expects a specific version
but it doesn't match the current version in the repository,
+ // another user may have committed changes in the meantime. Reject the
commit unless FORCE_COMMIT is specified.
+ if (expectedVersion != null && currentVersion != null &&
!expectedVersion.equals(currentVersion) && action !=
RegisterAction.FORCE_COMMIT) {
+ throw new FlowRegistryException("""
+ Version conflict detected for flow [%s] in bucket [%s] on
branch [%s].
+ Expected version [%s] but the current version in the
repository is [%s].
+ Another user may have committed changes. Please check for
a newer version and try again."""
+ .formatted(flowLocation.getFlowId(),
flowLocation.getBucketId(), branch, expectedVersion, currentVersion));
+ }
+
+ // For atomic commit operations, we need:
+ // - existingContentSha: the blob SHA at the expected version (for
GitHub which uses blob SHAs)
+ // - expectedCommitSha: the commit SHA the user expects (for GitLab,
Bitbucket, Azure DevOps which use commit SHAs)
+ // If expectedVersion is provided, use the blob SHA at that commit;
otherwise use the current blob SHA
+ final String existingBlobSha;
+ if (expectedVersion != null) {
+ existingBlobSha = repositoryClient.getContentShaAtCommit(filePath,
expectedVersion).orElse(null);
+ } else {
+ existingBlobSha = repositoryClient.getContentSha(filePath,
branch).orElse(null);
+ }
final String snapshotComments = snapshotMetadata.getComments();
final String commitMessage = StringUtils.isBlank(snapshotComments) ?
DEFAULT_FLOW_SNAPSHOT_MESSAGE_FORMAT.formatted(flowLocation.getFlowId()) :
snapshotComments;
@@ -415,7 +442,8 @@ public abstract class AbstractGitFlowRegistryClient extends
AbstractFlowRegistry
.path(filePath)
.content(flowSnapshotSerializer.serialize(flowSnapshot))
.message(commitMessage)
- .existingContentSha(previousSha)
+ .existingContentSha(existingBlobSha)
+ .expectedCommitSha(expectedVersion)
.build();
final String createContentCommitSha =
repositoryClient.createContent(createContentRequest);
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitCreateContentRequest.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitCreateContentRequest.java
index e5481123d0..3c8729bd25 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitCreateContentRequest.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitCreateContentRequest.java
@@ -26,6 +26,7 @@ public class GitCreateContentRequest {
private final String content;
private final String message;
private final String existingContentSha;
+ private final String expectedCommitSha;
private GitCreateContentRequest(final Builder builder) {
this.branch = Objects.requireNonNull(builder.branch);
@@ -34,6 +35,8 @@ public class GitCreateContentRequest {
this.message = Objects.requireNonNull(builder.message);
// Will be null for create, and populated for update
this.existingContentSha = builder.existingContentSha;
+ // Commit SHA for providers that support atomic commits via commit SHA
+ this.expectedCommitSha = builder.expectedCommitSha;
}
public String getBranch() {
@@ -56,6 +59,10 @@ public class GitCreateContentRequest {
return existingContentSha;
}
+ public String getExpectedCommitSha() {
+ return expectedCommitSha;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -66,6 +73,7 @@ public class GitCreateContentRequest {
private String content;
private String message;
private String existingContentSha;
+ private String expectedCommitSha;
public Builder branch(final String branch) {
this.branch = branch;
@@ -92,6 +100,11 @@ public class GitCreateContentRequest {
return this;
}
+ public Builder expectedCommitSha(final String expectedCommitSha) {
+ this.expectedCommitSha = expectedCommitSha;
+ return this;
+ }
+
public GitCreateContentRequest build() {
return new GitCreateContentRequest(this);
}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java
index a64ecab97c..3d02cdbeab 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java
@@ -114,6 +114,23 @@ public interface GitRepositoryClient {
*/
Optional<String> getContentSha(String path, String branch) throws
IOException, FlowRegistryException;
+ /**
+ * Retrieves the blob SHA of the file at the given path from a specific
commit.
+ * This is used for atomic commit operations where we need the blob SHA at
the
+ * user's expected version, not the current version.
+ *
+ * @param path the path of the file
+ * @param commitSha the commit SHA to get the blob SHA from
+ * @return the blob SHA of the content at the specified commit, or empty
if not found
+ * @throws IOException if an I/O error occurs
+ * @throws FlowRegistryException if a non-I/O error occurs
+ */
+ default Optional<String> getContentShaAtCommit(String path, String
commitSha) throws IOException, FlowRegistryException {
+ // Default implementation returns empty to maintain backward
compatibility.
+ // Implementations should override this to support atomic commit
operations.
+ return Optional.empty();
+ }
+
/**
* Creates a file in the repository based on the given request.
*
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java
index 9cec464b2b..21e85bc4ea 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java
@@ -263,6 +263,11 @@ class AbstractGitFlowRegistryClientTest {
throw new UnsupportedOperationException("Not required for test");
}
+ @Override
+ public Optional<String> getContentShaAtCommit(final String path, final
String commitSha) {
+ throw new UnsupportedOperationException("Not required for test");
+ }
+
@Override
public String createContent(final GitCreateContentRequest request) {
throw new UnsupportedOperationException("Not required for test");
diff --git
a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java
b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java
index f428d47679..3c1af3a2c4 100644
---
a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java
+++
b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java
@@ -409,6 +409,34 @@ public class GitHubRepositoryClient implements
GitRepositoryClient {
});
}
+ /**
+ * Gets the blob SHA for the given path at a specific commit.
+ * This is used for atomic commit operations where we need the blob SHA at
the
+ * user's expected version, not the current version.
+ *
+ * @param path the path to the content
+ * @param commitSha the commit SHA
+ * @return blob SHA for the given file at the specified commit, or empty
optional
+ *
+ * @throws IOException if an I/O error happens calling GitHub
+ * @throws FlowRegistryException if a non I/O error happens calling GitHub
+ */
+ @Override
+ public Optional<String> getContentShaAtCommit(final String path, final
String commitSha) throws IOException, FlowRegistryException {
+ final String resolvedPath = getResolvedPath(path);
+ logger.debug("Getting content SHA for [{}] at commit [{}] in
repository [{}]", resolvedPath, commitSha, repository.getName());
+
+ return execute(() -> {
+ try {
+ final GHContent ghContent =
repository.getFileContent(resolvedPath, commitSha);
+ return Optional.of(ghContent.getSha());
+ } catch (final FileNotFoundException e) {
+ logger.debug("Unable to get content SHA for [{}] at commit
[{}] because content does not exist", resolvedPath, commitSha);
+ return Optional.empty();
+ }
+ });
+ }
+
/**
* Deletes the contents for the given file on the given branch.
*
diff --git
a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
index ef8cb9e5a2..e7a0a49e27 100644
---
a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
+++
b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
@@ -30,6 +30,7 @@ import org.apache.nifi.registry.flow.RegisterAction;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.git.client.GitCommit;
import org.apache.nifi.registry.flow.git.client.GitCreateContentRequest;
import org.apache.nifi.registry.flow.git.serialize.FlowSnapshotSerializer;
import org.junit.jupiter.api.BeforeEach;
@@ -40,8 +41,11 @@ import org.mockito.ArgumentCaptor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Instant;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -50,8 +54,11 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -179,6 +186,82 @@ public class GitHubFlowRegistryClientTest {
assertEquals(incomingMetadata.getBucketIdentifier(),
resultBucket.getName());
}
+ @Test
+ public void testRegisterFlowSnapshotVersionConflict() throws IOException,
FlowRegistryException {
+ setupClientConfigurationContextWithDefaults();
+
+ final RegisteredFlow incomingFlow = createIncomingRegisteredFlow();
+
+ final RegisteredFlowSnapshotMetadata incomingMetadata = new
RegisteredFlowSnapshotMetadata();
+ incomingMetadata.setBranch(incomingFlow.getBranch());
+
incomingMetadata.setBucketIdentifier(incomingFlow.getBucketIdentifier());
+ incomingMetadata.setFlowIdentifier(incomingFlow.getIdentifier());
+ incomingMetadata.setComments("Unit test");
+ incomingMetadata.setTimestamp(System.currentTimeMillis());
+ // Set the expected version to an old commit SHA - simulating user has
an outdated version
+ incomingMetadata.setVersion("old-commit-sha-12345");
+
+ final RegisteredFlowSnapshot incomingSnapshot = new
RegisteredFlowSnapshot();
+ incomingSnapshot.setFlow(incomingFlow);
+ incomingSnapshot.setSnapshotMetadata(incomingMetadata);
+ incomingSnapshot.setFlowContents(new VersionedProcessGroup());
+
+ // Mock repository to return a different (newer) commit SHA than what
the user expects
+ final GitCommit newerCommit = new GitCommit("new-commit-sha-67890",
"author", "message", Instant.now());
+
doReturn(List.of(newerCommit)).when(repositoryClient).getCommits(any(), any());
+
+ // Attempting to commit should throw a FlowRegistryException due to
version conflict
+ final FlowRegistryException exception =
assertThrows(FlowRegistryException.class, () ->
+
flowRegistryClient.registerFlowSnapshot(clientConfigurationContext,
incomingSnapshot, RegisterAction.COMMIT));
+
+ assertTrue(exception.getMessage().contains("Version conflict
detected"));
+ assertTrue(exception.getMessage().contains("old-commit-sha-12345"));
+ assertTrue(exception.getMessage().contains("new-commit-sha-67890"));
+ assertTrue(exception.getMessage().contains("Please check for a newer
version and try again"));
+ }
+
+ @Test
+ public void testRegisterFlowSnapshotVersionConflictWithForceCommit()
throws IOException, FlowRegistryException {
+ setupClientConfigurationContextWithDefaults();
+
+ final RegisteredFlow incomingFlow = createIncomingRegisteredFlow();
+
+ final RegisteredFlowSnapshotMetadata incomingMetadata = new
RegisteredFlowSnapshotMetadata();
+ incomingMetadata.setBranch(incomingFlow.getBranch());
+
incomingMetadata.setBucketIdentifier(incomingFlow.getBucketIdentifier());
+ incomingMetadata.setFlowIdentifier(incomingFlow.getIdentifier());
+ incomingMetadata.setComments("Unit test with force commit");
+ incomingMetadata.setTimestamp(System.currentTimeMillis());
+ // Set the expected version to an old commit SHA - simulating user has
an outdated version
+ incomingMetadata.setVersion("old-commit-sha-12345");
+
+ final RegisteredFlowSnapshot incomingSnapshot = new
RegisteredFlowSnapshot();
+ incomingSnapshot.setFlow(incomingFlow);
+ incomingSnapshot.setSnapshotMetadata(incomingMetadata);
+ incomingSnapshot.setFlowContents(new VersionedProcessGroup());
+
+ // Mock repository to return a different (newer) commit SHA than what
the user expects
+ final GitCommit newerCommit = new GitCommit("new-commit-sha-67890",
"author", "message", Instant.now());
+
doReturn(List.of(newerCommit)).when(repositoryClient).getCommits(any(), any());
+
+ // Mock blob SHA at the expected commit for atomic operation
+
doReturn(Optional.of("blob-sha-at-expected-commit")).when(repositoryClient).getContentShaAtCommit(any(),
any());
+
+ // Mock the content retrieval for the existing snapshot
+ doReturn(new ByteArrayInputStream(new
byte[0])).when(repositoryClient).getContentFromBranch(any(), any());
+
when(flowSnapshotSerializer.deserialize(any(InputStream.class))).thenReturn(incomingSnapshot);
+
when(flowSnapshotSerializer.serialize(any(RegisteredFlowSnapshot.class))).thenReturn("serialized
content");
+
+ final String commitSha = "commitSha";
+
when(repositoryClient.createContent(any(GitCreateContentRequest.class))).thenReturn(commitSha);
+
+ // With FORCE_COMMIT, the operation should succeed despite version
mismatch
+ final RegisteredFlowSnapshot resultSnapshot =
flowRegistryClient.registerFlowSnapshot(
+ clientConfigurationContext, incomingSnapshot,
RegisterAction.FORCE_COMMIT);
+
+ assertNotNull(resultSnapshot);
+ }
+
@Test
public void testRegisterFlowSnapshotWithIgnoreUpdatesParameterStrategy()
throws IOException, FlowRegistryException {
setupClientConfigurationContextWithDefaults();
diff --git
a/nifi-extension-bundles/nifi-gitlab-bundle/nifi-gitlab-extensions/src/main/java/org/apache/nifi/gitlab/GitLabRepositoryClient.java
b/nifi-extension-bundles/nifi-gitlab-bundle/nifi-gitlab-extensions/src/main/java/org/apache/nifi/gitlab/GitLabRepositoryClient.java
index f63ac57424..a6292e4ab2 100644
---
a/nifi-extension-bundles/nifi-gitlab-bundle/nifi-gitlab-extensions/src/main/java/org/apache/nifi/gitlab/GitLabRepositoryClient.java
+++
b/nifi-extension-bundles/nifi-gitlab-bundle/nifi-gitlab-extensions/src/main/java/org/apache/nifi/gitlab/GitLabRepositoryClient.java
@@ -253,6 +253,13 @@ public class GitLabRepositoryClient implements
GitRepositoryClient {
commitAction.setFilePath(resolvedPath);
commitAction.setEncoding(Encoding.BASE64);
+ // Set the expected commit SHA for atomic operation - GitLab will
reject if the file
+ // has been modified since this commit
+ final String expectedCommitSha = request.getExpectedCommitSha();
+ if (expectedCommitSha != null) {
+ commitAction.setLastCommitId(expectedCommitSha);
+ }
+
// Encode content to Base64
final String encodedContent =
Base64.getEncoder().encodeToString(request.getContent().getBytes(StandardCharsets.UTF_8));
commitAction.setContent(encodedContent);