This is an automated email from the ASF dual-hosted git repository.
kdoran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-registry.git
The following commit(s) were added to refs/heads/main by this push:
new f1757f5 NIFIREG-409 Refactoring revision management
f1757f5 is described below
commit f1757f5ece14ca140f3526164e7ae03cf6c218b1
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Aug 4 13:14:12 2020 -0400
NIFIREG-409 Refactoring revision management
Make RevisionManager responsible for incrementing revisions rather than
caller.
This creates consistency between the naive and JDBC implementations.
This closes #291.
Signed-off-by: Kevin Doran <[email protected]>
---
.../{UpdateRevisionTask.java => UpdateResult.java} | 22 +++++---
.../registry/revision/api/UpdateRevisionTask.java | 8 +--
.../revision/naive/NaiveRevisionManager.java | 42 +++++++++++----
.../revision/standard/StandardUpdateResult.java | 59 ++++++++++++++++++++++
.../entity/StandardRevisableEntityService.java | 50 +++++++-----------
.../revision/jdbc/JdbcRevisionManager.java | 26 ++++++++--
.../revision/jdbc/TestJdbcRevisionManager.java | 24 ++++-----
7 files changed, 161 insertions(+), 70 deletions(-)
diff --git
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateResult.java
similarity index 76%
copy from
nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java
copy to
nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateResult.java
index 3db8f9f..4181460 100644
---
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java
+++
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateResult.java
@@ -18,17 +18,27 @@ package org.apache.nifi.registry.revision.api;
/**
* <p>
- * A task that is responsible for updating some entities.
+ * The result of an update task.
* </p>
*
* NOTE: This API is considered a framework level API for the NiFi ecosystem
and may evolve as
* the NiFi PMC and committers deem necessary. It is not considered a public
extension point.
*/
-public interface UpdateRevisionTask<T> {
+public interface UpdateResult<T> {
+
+ /**
+ * @return the entity that was updated
+ */
+ T getEntity();
+
/**
- * Updates one or more entities and returns updated Revisions for those
entities.
- *
- * @return the updated revisions for the entities
+ * @return the id of the entity that was updated
*/
- RevisionUpdate<T> update();
+ String getEntityId();
+
+ /**
+ * @return the identity of the user that updated the entity
+ */
+ String updaterIdentity();
+
}
diff --git
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java
index 3db8f9f..c9d5748 100644
---
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java
+++
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java
@@ -18,7 +18,7 @@ package org.apache.nifi.registry.revision.api;
/**
* <p>
- * A task that is responsible for updating some entities.
+ * A task that is responsible for updating an entity.
* </p>
*
* NOTE: This API is considered a framework level API for the NiFi ecosystem
and may evolve as
@@ -26,9 +26,9 @@ package org.apache.nifi.registry.revision.api;
*/
public interface UpdateRevisionTask<T> {
/**
- * Updates one or more entities and returns updated Revisions for those
entities.
+ * Updates an entity and returns the resulting entity.
*
- * @return the updated revisions for the entities
+ * @return the update result containing the updated entity
*/
- RevisionUpdate<T> update();
+ UpdateResult<T> update();
}
diff --git
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/naive/NaiveRevisionManager.java
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/naive/NaiveRevisionManager.java
index 0d161cd..641a360 100644
---
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/naive/NaiveRevisionManager.java
+++
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/naive/NaiveRevisionManager.java
@@ -17,22 +17,27 @@
package org.apache.nifi.registry.revision.naive;
import org.apache.nifi.registry.revision.api.DeleteRevisionTask;
+import org.apache.nifi.registry.revision.api.EntityModification;
import org.apache.nifi.registry.revision.api.ExpiredRevisionClaimException;
import org.apache.nifi.registry.revision.api.InvalidRevisionException;
import org.apache.nifi.registry.revision.api.Revision;
import org.apache.nifi.registry.revision.api.RevisionClaim;
import org.apache.nifi.registry.revision.api.RevisionManager;
import org.apache.nifi.registry.revision.api.RevisionUpdate;
+import org.apache.nifi.registry.revision.api.UpdateResult;
import org.apache.nifi.registry.revision.api.UpdateRevisionTask;
import org.apache.nifi.registry.revision.standard.RevisionComparator;
+import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -103,7 +108,8 @@ public class NaiveRevisionManager implements
RevisionManager {
}
@Override
- public <T> RevisionUpdate<T> updateRevision(final RevisionClaim
originalClaim, final UpdateRevisionTask<T> task) throws
ExpiredRevisionClaimException {
+ public <T> RevisionUpdate<T> updateRevision(final RevisionClaim
originalClaim, final UpdateRevisionTask<T> task)
+ throws ExpiredRevisionClaimException {
logger.debug("Attempting to update revision using {}", originalClaim);
final List<Revision> revisionList = new
ArrayList<>(originalClaim.getRevisions());
@@ -123,17 +129,35 @@ public class NaiveRevisionManager implements
RevisionManager {
logger.debug("Successfully verified Revision Claim for all revisions");
// Perform the update
- final RevisionUpdate<T> updatedComponent = task.update();
+ // If an exception is thrown we don't want to update revision so it is
ok to bounce out of this method
+ final UpdateResult<T> updateResult = task.update();
+ if (updateResult == null) {
+ return null;
+ }
- // If the update succeeded then put the updated revisions into the
revisionMap
- // If an exception is thrown during the update we don't want to update
revision so it is ok to bounce out of this method
- if (updatedComponent != null) {
- for (final Revision updatedRevision :
updatedComponent.getUpdatedRevisions()) {
- revisionMap.put(updatedRevision.getEntityId(),
updatedRevision);
- }
+ // The update succeeded so increment the revisions
+ final Set<Revision> incrementedRevisions = new HashSet<>();
+ for (final Revision incomingRevision : revisionList) {
+ final String entityId = incomingRevision.getEntityId();
+ final String clientId = incomingRevision.getClientId();
+
+ // retrieve the revision from the map here because the incoming
revision may have been
+ // verified based on the client id and may not contain the latest
version
+ final Revision existingRevision = revisionMap.get(entityId);
+ final Revision incrementedRevision =
existingRevision.incrementRevision(clientId);
+ incrementedRevisions.add(incrementedRevision);
+
+ revisionMap.put(entityId, incrementedRevision);
}
- return updatedComponent;
+ // Create the result with the updated entity and updated revisions
+ final T updatedEntity = updateResult.getEntity();
+ final String updaterIdentity = updateResult.updaterIdentity();
+
+ final Revision updatedEntityRevision =
revisionMap.get(updateResult.getEntityId());
+ final EntityModification entityModification = new
EntityModification(updatedEntityRevision, updaterIdentity);
+
+ return new StandardRevisionUpdate<>(updatedEntity, entityModification,
incrementedRevisions);
}
}
diff --git
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/standard/StandardUpdateResult.java
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/standard/StandardUpdateResult.java
new file mode 100644
index 0000000..a09ed0a
--- /dev/null
+++
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/standard/StandardUpdateResult.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.revision.standard;
+
+import org.apache.nifi.registry.revision.api.UpdateResult;
+
+public class StandardUpdateResult<T> implements UpdateResult<T> {
+
+ private final T entity;
+ private final String entityId;
+ private final String updaterIdentity;
+
+ public StandardUpdateResult(final T entity, final String entityId, final
String updaterIdentity) {
+ this.entity = entity;
+ this.entityId = entityId;
+ this.updaterIdentity = updaterIdentity;
+
+ if (this.entity == null) {
+ throw new IllegalArgumentException("Entity is required");
+ }
+
+ if (this.entityId == null || this.entityId.trim().isEmpty()) {
+ throw new IllegalArgumentException("Entity id is required");
+ }
+
+ if (this.updaterIdentity == null ||
this.updaterIdentity.trim().isEmpty()) {
+ throw new IllegalArgumentException("Updater identity is required");
+ }
+ }
+
+ @Override
+ public T getEntity() {
+ return entity;
+ }
+
+ @Override
+ public String getEntityId() {
+ return entityId;
+ }
+
+ @Override
+ public String updaterIdentity() {
+ return updaterIdentity;
+ }
+}
diff --git
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-entity-service/src/main/java/org/apache/nifi/registry/revision/entity/StandardRevisableEntityService.java
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-entity-service/src/main/java/org/apache/nifi/registry/revision/entity/StandardRevisableEntityService.java
index fa3d4f5..a0f0ab7 100644
---
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-entity-service/src/main/java/org/apache/nifi/registry/revision/entity/StandardRevisableEntityService.java
+++
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-entity-service/src/main/java/org/apache/nifi/registry/revision/entity/StandardRevisableEntityService.java
@@ -22,7 +22,7 @@ import org.apache.nifi.registry.revision.api.RevisionClaim;
import org.apache.nifi.registry.revision.api.RevisionManager;
import org.apache.nifi.registry.revision.api.RevisionUpdate;
import org.apache.nifi.registry.revision.standard.StandardRevisionClaim;
-import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate;
+import org.apache.nifi.registry.revision.standard.StandardUpdateResult;
import java.util.Collection;
import java.util.List;
@@ -54,26 +54,7 @@ public class StandardRevisableEntityService implements
RevisableEntityService {
throw new IllegalArgumentException("A revision version of 0 must
be specified when creating a new entity");
}
- if (creatorIdentity == null || creatorIdentity.trim().isEmpty()) {
- throw new IllegalArgumentException("Creator identity is required");
- }
-
- final Revision revision =
createRevision(requestEntity.getIdentifier(), requestEntity.getRevision());
- final RevisionClaim claim = new StandardRevisionClaim(revision);
-
- final RevisionUpdate<T> revisionUpdate =
revisionManager.updateRevision(claim, () -> {
- final T updatedEntity = createEntity.get();
-
- final Revision updatedRevision =
revision.incrementRevision(revision.getClientId());
- final EntityModification entityModification = new
EntityModification(updatedRevision, creatorIdentity);
-
- final RevisionInfo updatedRevisionInfo =
createRevisionInfo(updatedRevision, entityModification);
- updatedEntity.setRevision(updatedRevisionInfo);
-
- return new StandardRevisionUpdate<>(updatedEntity,
entityModification);
- });
-
- return revisionUpdate.getEntity();
+ return createOrUpdate(requestEntity, creatorIdentity, createEntity);
}
@Override
@@ -94,6 +75,10 @@ public class StandardRevisableEntityService implements
RevisableEntityService {
@Override
public <T extends RevisableEntity> T update(final T requestEntity, final
String updaterIdentity, final Supplier<T> updateEntity) {
+ return createOrUpdate(requestEntity, updaterIdentity, updateEntity);
+ }
+
+ private <T extends RevisableEntity> T createOrUpdate(final T
requestEntity, final String userIdentity, final Supplier<T>
createOrUpdateEntity) {
if (requestEntity == null) {
throw new IllegalArgumentException("Request entity is required");
}
@@ -102,26 +87,21 @@ public class StandardRevisableEntityService implements
RevisableEntityService {
throw new IllegalArgumentException("Revision info is required");
}
- if (updaterIdentity == null || updaterIdentity.trim().isEmpty()) {
- throw new IllegalArgumentException("Updater identity is required");
+ if (userIdentity == null || userIdentity.trim().isEmpty()) {
+ throw new IllegalArgumentException("User identity is required");
}
final Revision revision =
createRevision(requestEntity.getIdentifier(), requestEntity.getRevision());
final RevisionClaim claim = new StandardRevisionClaim(revision);
final RevisionUpdate<T> revisionUpdate =
revisionManager.updateRevision(claim, () -> {
- final T updatedEntity = updateEntity.get();
-
- final Revision updatedRevision =
revisionManager.getRevision(requestEntity.getIdentifier()).incrementRevision(revision.getClientId());
- final EntityModification entityModification = new
EntityModification(updatedRevision, updaterIdentity);
-
- final RevisionInfo updatedRevisionInfo =
createRevisionInfo(updatedRevision, entityModification);
- updatedEntity.setRevision(updatedRevisionInfo);
-
- return new StandardRevisionUpdate<>(updatedEntity,
entityModification);
+ final T updatedEntity = createOrUpdateEntity.get();
+ return new StandardUpdateResult<>(updatedEntity,
updatedEntity.getIdentifier(), userIdentity);
});
- return revisionUpdate.getEntity();
+ final T resultEntity = revisionUpdate.getEntity();
+
resultEntity.setRevision(createRevisionInfo(revisionUpdate.getLastModification()));
+ return resultEntity;
}
@Override
@@ -199,6 +179,10 @@ public class StandardRevisableEntityService implements
RevisableEntityService {
return createRevisionInfo(revision, null);
}
+ private RevisionInfo createRevisionInfo(final EntityModification
entityModification) {
+ return createRevisionInfo(entityModification.getRevision(),
entityModification);
+ }
+
private RevisionInfo createRevisionInfo(final Revision revision, final
EntityModification entityModification) {
final RevisionInfo revisionInfo = new RevisionInfo();
revisionInfo.setVersion(revision.getVersion());
diff --git
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/main/java/org/apache/nifi/registry/revision/jdbc/JdbcRevisionManager.java
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/main/java/org/apache/nifi/registry/revision/jdbc/JdbcRevisionManager.java
index 74b2393..9aad556 100644
---
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/main/java/org/apache/nifi/registry/revision/jdbc/JdbcRevisionManager.java
+++
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/main/java/org/apache/nifi/registry/revision/jdbc/JdbcRevisionManager.java
@@ -17,14 +17,17 @@
package org.apache.nifi.registry.revision.jdbc;
import org.apache.nifi.registry.revision.api.DeleteRevisionTask;
+import org.apache.nifi.registry.revision.api.EntityModification;
import org.apache.nifi.registry.revision.api.ExpiredRevisionClaimException;
import org.apache.nifi.registry.revision.api.InvalidRevisionException;
import org.apache.nifi.registry.revision.api.Revision;
import org.apache.nifi.registry.revision.api.RevisionClaim;
import org.apache.nifi.registry.revision.api.RevisionManager;
import org.apache.nifi.registry.revision.api.RevisionUpdate;
+import org.apache.nifi.registry.revision.api.UpdateResult;
import org.apache.nifi.registry.revision.api.UpdateRevisionTask;
import org.apache.nifi.registry.revision.standard.RevisionComparator;
+import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.EmptyResultDataAccessException;
@@ -33,9 +36,11 @@ import org.springframework.jdbc.core.JdbcTemplate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
/**
* A database implementation of {@link RevisionManager} that use's Spring's
{@link JdbcTemplate}.
@@ -105,20 +110,35 @@ public class JdbcRevisionManager implements
RevisionManager {
// Since we are in transaction these changes won't be committed unless
the entire task completes successfully.
// It is important this happens first so that the task won't execute
unless the revision can be updated.
// This prevents any other changes from happening that might not be
part of the database transaction.
+ final Set<Revision> incrementedRevisions = new HashSet<>();
for (final Revision incomingRevision : revisionList) {
+ final String entityId = incomingRevision.getEntityId();
+
// calling getRevision here will lazily create an initial revision
- getRevision(incomingRevision.getEntityId());
+ getRevision(entityId);
updateRevision(incomingRevision);
+
+ // retrieve the updated revision since the incoming revision may
have matched on the client id
+ // and may not have the latest version which we want to return
with the result
+ final Revision incrementedRevision = getRevision(entityId);
+ incrementedRevisions.add(incrementedRevision);
}
// We successfully verified all revisions.
LOGGER.debug("Successfully verified Revision Claim for all revisions");
// Perform the update
- final RevisionUpdate<T> updatedEntity = task.update();
+ final UpdateResult<T> updateResult = task.update();
LOGGER.debug("Update task completed");
- return updatedEntity;
+ // Create the result with the updated entity and updated revisions
+ final T updatedEntity = updateResult.getEntity();
+ final String updaterIdentity = updateResult.updaterIdentity();
+
+ final Revision updatedEntityRevision =
getRevision(updateResult.getEntityId());
+ final EntityModification entityModification = new
EntityModification(updatedEntityRevision, updaterIdentity);
+
+ return new StandardRevisionUpdate<>(updatedEntity, entityModification,
incrementedRevisions);
}
/*
diff --git
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/test/java/org/apache/nifi/registry/revision/jdbc/TestJdbcRevisionManager.java
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/test/java/org/apache/nifi/registry/revision/jdbc/TestJdbcRevisionManager.java
index 3eecc9a..cbb7e46 100644
---
a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/test/java/org/apache/nifi/registry/revision/jdbc/TestJdbcRevisionManager.java
+++
b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/test/java/org/apache/nifi/registry/revision/jdbc/TestJdbcRevisionManager.java
@@ -26,7 +26,7 @@ import org.apache.nifi.registry.revision.api.RevisionManager;
import org.apache.nifi.registry.revision.api.RevisionUpdate;
import org.apache.nifi.registry.revision.api.UpdateRevisionTask;
import org.apache.nifi.registry.revision.standard.StandardRevisionClaim;
-import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate;
+import org.apache.nifi.registry.revision.standard.StandardUpdateResult;
import org.flywaydb.core.internal.jdbc.DatabaseType;
import org.junit.Assert;
import org.junit.Before;
@@ -350,12 +350,7 @@ public class TestJdbcRevisionManager {
final RevisableEntity entity = new RevisableEntity();
entity.setId(entityId);
- // get the latest revision which has already been incremented
- final Revision updatedRevision =
revisionManager.getRevision(entity.getId());
- entity.setRevision(updatedRevision);
-
- final EntityModification entityModification = new
EntityModification(updatedRevision, "user1");
- return new StandardRevisionUpdate<>(entity, entityModification);
+ return new StandardUpdateResult<>(entity, entityId,"user1");
};
}
@@ -366,18 +361,17 @@ public class TestJdbcRevisionManager {
assertNotNull(updatedEntity);
assertEquals(entityId, updatedEntity.getId());
- // verify the revision in the entity is set and is the updated
revision (i.e. version of 100, not 99)
- final Revision updatedRevision = updatedEntity.getRevision();
- assertNotNull(updatedRevision);
- assertEquals(entityId, updatedRevision.getEntityId());
- assertEquals(expectedVersion, updatedRevision.getVersion());
- assertEquals(expectedClientId, updatedRevision.getClientId());
-
// verify the entity modification is correctly populated
final EntityModification entityModification =
revisionUpdate.getLastModification();
assertNotNull(entityModification);
Assert.assertEquals("user1", entityModification.getLastModifier());
- assertEquals(updatedRevision, entityModification.getRevision());
+
+ // verify the revision in the entity modification is set and is the
updated revision (i.e. version of 100, not 99)
+ final Revision updatedRevision = entityModification.getRevision();
+ assertNotNull(updatedRevision);
+ assertEquals(entityId, updatedRevision.getEntityId());
+ assertEquals(expectedVersion, updatedRevision.getVersion());
+ assertEquals(expectedClientId, updatedRevision.getClientId());
// verify the updated revisions is correctly populated and matches the
updated entity revision
final Set<Revision> updatedRevisions =
revisionUpdate.getUpdatedRevisions();