http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java deleted file mode 100644 index 7e87199..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository.claim; - -public class StandardResourceClaim implements ResourceClaim, Comparable<ResourceClaim> { - private final StandardResourceClaimManager claimManager; - private final String id; - private final String container; - private final String section; - private final boolean lossTolerant; - private final int hashCode; - private volatile boolean writable = true; - - public StandardResourceClaim(final StandardResourceClaimManager claimManager, final String container, final String section, final String id, final boolean lossTolerant) { - this.claimManager = claimManager; - this.container = container.intern(); - this.section = section.intern(); - this.id = id; - this.lossTolerant = lossTolerant; - - hashCode = 17 + 19 * id.hashCode() + 19 * container.hashCode() + 19 * section.hashCode(); - } - - @Override - public boolean isLossTolerant() { - return lossTolerant; - } - - /** - * @return the unique identifier for this claim - */ - @Override - public String getId() { - return id; - } - - /** - * @return the container identifier in which this claim is held - */ - @Override - public String getContainer() { - return container; - } - - /** - * @return the section within a given container the claim is held - */ - @Override - public String getSection() { - return section; - } - - @Override - public boolean equals(final Object other) { - if (this == other) { - return true; - } - - if (other == null) { - return false; - } - if (hashCode != other.hashCode()) { - // We check hash code before instanceof because instanceof is fairly expensive and for - // StandardResourceClaim, calling hashCode() simply returns a pre-calculated value. - return false; - } - - if (!(other instanceof ResourceClaim)) { - return false; - } - final ResourceClaim otherClaim = (ResourceClaim) other; - return id.equals(otherClaim.getId()) && container.equals(otherClaim.getContainer()) && section.equals(otherClaim.getSection()); - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public String toString() { - return "StandardResourceClaim[id=" + id + ", container=" + container + ", section=" + section + "]"; - } - - @Override - public boolean isWritable() { - return writable; - } - - /** - * Freeze the Resource Claim so that it can now longer be written to - */ - void freeze() { - this.writable = false; - } - - @Override - public boolean isInUse() { - // Note that it is critical here that we always check isWritable() BEFORE checking - // the claimant count. This is due to the fact that if the claim is in fact writable, the claimant count - // could increase. So if we first check claimant count and that is 0, and then we check isWritable, it may be - // that the claimant count has changed to 1 before checking isWritable. - // However, if isWritable() is false, then the only way that the claimant count can increase is if a FlowFile referencing - // the Resource Claim is cloned. In this case, though, the claimant count has not become 0. - // Said another way, if isWritable() == false, then the claimant count can never increase from 0. - return isWritable() || claimManager.getClaimantCount(this) > 0; - } -}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java deleted file mode 100644 index e4f060e..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository.claim; - -import java.util.Collection; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StandardResourceClaimManager implements ResourceClaimManager { - - private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class); - private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>(); - private final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000); - - @Override - public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) { - final StandardResourceClaim claim = new StandardResourceClaim(this, container, section, id, lossTolerant); - if (!writable) { - claim.freeze(); - } - return claim; - } - - @Override - public ResourceClaim getResourceClaim(final String container, final String section, final String id) { - final ResourceClaim tempClaim = new StandardResourceClaim(this, container, section, id, false); - final ClaimCount count = claimantCounts.get(tempClaim); - return (count == null) ? null : count.getClaim(); - } - - private AtomicInteger getCounter(final ResourceClaim claim) { - if (claim == null) { - return null; - } - - ClaimCount counter = claimantCounts.get(claim); - if (counter != null) { - return counter.getCount(); - } - - counter = new ClaimCount(claim, new AtomicInteger(0)); - final ClaimCount existingCounter = claimantCounts.putIfAbsent(claim, counter); - return existingCounter == null ? counter.getCount() : existingCounter.getCount(); - } - - @Override - public int getClaimantCount(final ResourceClaim claim) { - if (claim == null) { - return 0; - } - - synchronized (claim) { - final ClaimCount counter = claimantCounts.get(claim); - return counter == null ? 0 : counter.getCount().get(); - } - } - - @Override - public int decrementClaimantCount(final ResourceClaim claim) { - if (claim == null) { - return 0; - } - - synchronized (claim) { - final ClaimCount counter = claimantCounts.get(claim); - if (counter == null) { - logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); - return -1; - } - - final int newClaimantCount = counter.getCount().decrementAndGet(); - if (newClaimantCount < 0) { - logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount); - } else { - logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount); - } - - // If the claim is no longer referenced, we want to remove it. We consider the claim to be "no longer referenced" - // if the count is 0 and it is no longer writable (if it's writable, it may still be writable by the Content Repository, - // even though no existing FlowFile is referencing the claim). - if (newClaimantCount == 0 && !claim.isWritable()) { - removeClaimantCount(claim); - } - return newClaimantCount; - } - } - - // protected so that it can be used in unit tests - protected void removeClaimantCount(final ResourceClaim claim) { - claimantCounts.remove(claim); - } - - @Override - public int incrementClaimantCount(final ResourceClaim claim) { - return incrementClaimantCount(claim, false); - } - - @Override - public int incrementClaimantCount(final ResourceClaim claim, final boolean newClaim) { - if (claim == null) { - return 0; - } - - synchronized (claim) { - final AtomicInteger counter = getCounter(claim); - - final int newClaimantCount = counter.incrementAndGet(); - logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount); - - // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims. - if (!newClaim && newClaimantCount == 1) { - destructableClaims.remove(claim); - } - return newClaimantCount; - } - } - - @Override - public void markDestructable(final ResourceClaim claim) { - if (claim == null) { - return; - } - - synchronized (claim) { - if (getClaimantCount(claim) > 0) { - return; - } - - logger.debug("Marking claim {} as destructable", claim); - try { - while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) { - } - } catch (final InterruptedException ie) { - } - } - } - - @Override - public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements) { - final int drainedCount = destructableClaims.drainTo(destination, maxElements); - logger.debug("Drained {} destructable claims to {}", drainedCount, destination); - } - - @Override - public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) { - try { - final ResourceClaim firstClaim = destructableClaims.poll(timeout, unit); - if (firstClaim != null) { - destination.add(firstClaim); - destructableClaims.drainTo(destination, maxElements - 1); - } - } catch (final InterruptedException e) { - } - } - - @Override - public void purge() { - claimantCounts.clear(); - } - - @Override - public void freeze(final ResourceClaim claim) { - if (claim == null) { - return; - } - - if (!(claim instanceof StandardResourceClaim)) { - throw new IllegalArgumentException("The given resource claim is not managed by this Resource Claim Manager"); - } - - ((StandardResourceClaim) claim).freeze(); - - synchronized (claim) { - if (getClaimantCount(claim) == 0) { - claimantCounts.remove(claim); - } - } - } - - - private static final class ClaimCount { - private final ResourceClaim claim; - private final AtomicInteger count; - - public ClaimCount(final ResourceClaim claim, final AtomicInteger count) { - this.claim = claim; - this.count = count; - } - - public AtomicInteger getCount() { - return count; - } - - public ResourceClaim getClaim() { - return claim; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java deleted file mode 100644 index b218ee6..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository.schema; - -import java.util.List; - -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.repository.schema.Record; -import org.apache.nifi.repository.schema.RecordField; -import org.apache.nifi.repository.schema.RecordSchema; - -public class ContentClaimFieldMap implements Record { - private final ContentClaim contentClaim; - private final long contentClaimOffset; - private final ResourceClaimFieldMap resourceClaimFieldMap; - private final RecordSchema schema; - - public ContentClaimFieldMap(final ContentClaim contentClaim, final long contentClaimOffset, final RecordSchema schema) { - this.contentClaim = contentClaim; - this.contentClaimOffset = contentClaimOffset; - this.schema = schema; - - final List<RecordField> resourceClaimFields = schema.getField(ContentClaimSchema.RESOURCE_CLAIM).getSubFields(); - final RecordSchema resourceClaimSchema = new RecordSchema(resourceClaimFields); - this.resourceClaimFieldMap = new ResourceClaimFieldMap(contentClaim.getResourceClaim(), resourceClaimSchema); - } - - @Override - public Object getFieldValue(final String fieldName) { - switch (fieldName) { - case ContentClaimSchema.RESOURCE_CLAIM: - return resourceClaimFieldMap; - case ContentClaimSchema.CONTENT_CLAIM_LENGTH: - return contentClaim.getLength(); - case ContentClaimSchema.CONTENT_CLAIM_OFFSET: - return contentClaimOffset; - case ContentClaimSchema.RESOURCE_CLAIM_OFFSET: - return contentClaim.getOffset(); - default: - return null; - } - } - - @Override - public RecordSchema getSchema() { - return schema; - } - - @Override - public int hashCode() { - return (int) (31 + contentClaimOffset + 21 * resourceClaimFieldMap.hashCode()); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - - ContentClaimFieldMap other = (ContentClaimFieldMap) obj; - if (contentClaimOffset != other.contentClaimOffset) { - return false; - } - - if (resourceClaimFieldMap == null) { - if (other.resourceClaimFieldMap != null) { - return false; - } - } else if (!resourceClaimFieldMap.equals(other.resourceClaimFieldMap)) { - return false; - } - - return true; - } - - @Override - public String toString() { - return "ContentClaimFieldMap[" + contentClaim + "]"; - } - - public static ContentClaim getContentClaim(final Record claimRecord, final ResourceClaimManager resourceClaimManager) { - final Record resourceClaimRecord = (Record) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM); - final String container = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER); - final String section = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_SECTION); - final String identifier = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER); - final Boolean lossTolerant = (Boolean) resourceClaimRecord.getFieldValue(ContentClaimSchema.LOSS_TOLERANT); - - final Long length = (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_LENGTH); - final Long resourceOffset = (Long) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM_OFFSET); - - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, lossTolerant, false); - final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); - contentClaim.setLength(length); - - return contentClaim; - } - - public static Long getContentClaimOffset(final Record claimRecord) { - return (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_OFFSET); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java deleted file mode 100644 index c55c758..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository.schema; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.nifi.repository.schema.ComplexRecordField; -import org.apache.nifi.repository.schema.FieldType; -import org.apache.nifi.repository.schema.RecordField; -import org.apache.nifi.repository.schema.RecordSchema; -import org.apache.nifi.repository.schema.Repetition; -import org.apache.nifi.repository.schema.SimpleRecordField; - -public class ContentClaimSchema { - - // resource claim fields - public static final String CLAIM_CONTAINER = "Container"; - public static final String CLAIM_SECTION = "Section"; - public static final String CLAIM_IDENTIFIER = "Identifier"; - public static final String LOSS_TOLERANT = "Loss Tolerant"; - public static final String RESOURCE_CLAIM = "Resource Claim"; - - // content claim fields - public static final String RESOURCE_CLAIM_OFFSET = "Resource Claim Offset"; // offset into resource claim where the content claim begins - public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset"; // offset into the content claim where the flowfile begins - public static final String CONTENT_CLAIM_LENGTH = "Content Claim Length"; - - public static final RecordSchema CONTENT_CLAIM_SCHEMA_V1; - public static final RecordSchema RESOURCE_CLAIM_SCHEMA_V1; - - static { - final List<RecordField> resourceClaimFields = new ArrayList<>(); - resourceClaimFields.add(new SimpleRecordField(CLAIM_CONTAINER, FieldType.STRING, Repetition.EXACTLY_ONE)); - resourceClaimFields.add(new SimpleRecordField(CLAIM_SECTION, FieldType.STRING, Repetition.EXACTLY_ONE)); - resourceClaimFields.add(new SimpleRecordField(CLAIM_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); - resourceClaimFields.add(new SimpleRecordField(LOSS_TOLERANT, FieldType.BOOLEAN, Repetition.EXACTLY_ONE)); - RESOURCE_CLAIM_SCHEMA_V1 = new RecordSchema(Collections.unmodifiableList(resourceClaimFields)); - - final List<RecordField> contentClaimFields = new ArrayList<>(); - contentClaimFields.add(new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, resourceClaimFields)); - contentClaimFields.add(new SimpleRecordField(RESOURCE_CLAIM_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE)); - contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE)); - contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_LENGTH, FieldType.LONG, Repetition.EXACTLY_ONE)); - CONTENT_CLAIM_SCHEMA_V1 = new RecordSchema(Collections.unmodifiableList(contentClaimFields)); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java deleted file mode 100644 index ff0615f..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository.schema; - -import java.util.Map; - -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.StandardFlowFileRecord; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.repository.schema.Record; -import org.apache.nifi.repository.schema.RecordField; -import org.apache.nifi.repository.schema.RecordSchema; - -public class FlowFileRecordFieldMap implements Record { - private final FlowFileRecord flowFile; - private final RecordSchema schema; - private final RecordSchema contentClaimSchema; - private final ContentClaimFieldMap contentClaim; - - public FlowFileRecordFieldMap(final FlowFileRecord flowFile, final RecordSchema schema) { - this.flowFile = flowFile; - this.schema = schema; - - final RecordField contentClaimField = schema.getField(FlowFileSchema.CONTENT_CLAIM); - contentClaimSchema = new RecordSchema(contentClaimField.getSubFields()); - contentClaim = flowFile.getContentClaim() == null ? null : new ContentClaimFieldMap(flowFile.getContentClaim(), flowFile.getContentClaimOffset(), contentClaimSchema); - } - - @Override - public RecordSchema getSchema() { - return schema; - } - - @Override - public Object getFieldValue(final String fieldName) { - switch (fieldName) { - case FlowFileSchema.ATTRIBUTES: - return flowFile.getAttributes(); - case FlowFileSchema.CONTENT_CLAIM: - return contentClaim; - case FlowFileSchema.ENTRY_DATE: - return flowFile.getEntryDate(); - case FlowFileSchema.FLOWFILE_SIZE: - return flowFile.getSize(); - case FlowFileSchema.LINEAGE_START_DATE: - return flowFile.getLineageStartDate(); - case FlowFileSchema.LINEAGE_START_INDEX: - return flowFile.getLineageStartIndex(); - case FlowFileSchema.QUEUE_DATE: - return flowFile.getLastQueueDate(); - case FlowFileSchema.QUEUE_DATE_INDEX: - return flowFile.getQueueDateIndex(); - case FlowFileSchema.RECORD_ID: - return flowFile.getId(); - } - - return null; - } - - @SuppressWarnings("unchecked") - public static FlowFileRecord getFlowFile(final Record record, final ResourceClaimManager claimManager) { - final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder(); - builder.id((Long) record.getFieldValue(FlowFileSchema.RECORD_ID)); - builder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE)); - builder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE)); - builder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES)); - builder.lineageStart((Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE), (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX)); - builder.lastQueued((Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE), (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX)); - - final Record contentClaimRecord = (Record) record.getFieldValue(FlowFileSchema.CONTENT_CLAIM); - if (contentClaimRecord != null) { - final ContentClaim claim = ContentClaimFieldMap.getContentClaim(contentClaimRecord, claimManager); - builder.contentClaim(claim); - - final Long offset = ContentClaimFieldMap.getContentClaimOffset(contentClaimRecord); - if (offset != null) { - builder.contentClaimOffset(offset); - } - } - - return builder.build(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java deleted file mode 100644 index 6af3066..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository.schema; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.nifi.repository.schema.ComplexRecordField; -import org.apache.nifi.repository.schema.FieldType; -import org.apache.nifi.repository.schema.MapRecordField; -import org.apache.nifi.repository.schema.RecordField; -import org.apache.nifi.repository.schema.RecordSchema; -import org.apache.nifi.repository.schema.Repetition; -import org.apache.nifi.repository.schema.SimpleRecordField; - -public class FlowFileSchema { - - public static final String RECORD_ID = "Record ID"; - public static final String ENTRY_DATE = "Entry Date"; - public static final String LINEAGE_START_DATE = "Lineage Start Date"; - public static final String LINEAGE_START_INDEX = "Lineage Start Index"; - public static final String QUEUE_DATE = "Queued Date"; - public static final String QUEUE_DATE_INDEX = "Queued Date Index"; - public static final String FLOWFILE_SIZE = "FlowFile Size"; - public static final String CONTENT_CLAIM = "Content Claim"; - public static final String ATTRIBUTES = "Attributes"; - - // attribute fields - public static final String ATTRIBUTE_NAME = "Attribute Name"; - public static final String ATTRIBUTE_VALUE = "Attribute Value"; - - public static final RecordSchema FLOWFILE_SCHEMA_V1; - public static final RecordSchema FLOWFILE_SCHEMA_V2; - - static { - final List<RecordField> flowFileFields = new ArrayList<>(); - - final RecordField attributeNameField = new SimpleRecordField(ATTRIBUTE_NAME, FieldType.STRING, Repetition.EXACTLY_ONE); - final RecordField attributeValueField = new SimpleRecordField(ATTRIBUTE_VALUE, FieldType.STRING, Repetition.EXACTLY_ONE); - - flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, Repetition.ZERO_OR_ONE, ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields())); - flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, attributeValueField, Repetition.ZERO_OR_ONE)); - - FLOWFILE_SCHEMA_V1 = new RecordSchema(flowFileFields); - } - - static { - final List<RecordField> flowFileFields = new ArrayList<>(); - - final RecordField attributeNameField = new SimpleRecordField(ATTRIBUTE_NAME, FieldType.LONG_STRING, Repetition.EXACTLY_ONE); - final RecordField attributeValueField = new SimpleRecordField(ATTRIBUTE_VALUE, FieldType.LONG_STRING, Repetition.EXACTLY_ONE); - - flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE)); - flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, Repetition.ZERO_OR_ONE, ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields())); - flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, attributeValueField, Repetition.ZERO_OR_ONE)); - - FLOWFILE_SCHEMA_V2 = new RecordSchema(flowFileFields); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java deleted file mode 100644 index 5fe4889..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository.schema; - -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.RepositoryRecord; -import org.apache.nifi.repository.schema.Record; -import org.apache.nifi.repository.schema.RecordSchema; - -public class RepositoryRecordFieldMap implements Record { - private final RepositoryRecord record; - private final FlowFileRecord flowFile; - private final RecordSchema schema; - private final RecordSchema contentClaimSchema; - - public RepositoryRecordFieldMap(final RepositoryRecord record, final RecordSchema repoRecordSchema, final RecordSchema contentClaimSchema) { - this.schema = repoRecordSchema; - this.contentClaimSchema = contentClaimSchema; - this.record = record; - this.flowFile = record.getCurrent(); - } - - @Override - public Object getFieldValue(final String fieldName) { - switch (fieldName) { - case RepositoryRecordSchema.ACTION_TYPE: - return record.getType().name(); - case RepositoryRecordSchema.RECORD_ID: - return record.getCurrent().getId(); - case RepositoryRecordSchema.SWAP_LOCATION: - return record.getSwapLocation(); - case FlowFileSchema.ATTRIBUTES: - return flowFile.getAttributes(); - case FlowFileSchema.ENTRY_DATE: - return flowFile.getEntryDate(); - case FlowFileSchema.FLOWFILE_SIZE: - return flowFile.getSize(); - case FlowFileSchema.LINEAGE_START_DATE: - return flowFile.getLineageStartDate(); - case FlowFileSchema.LINEAGE_START_INDEX: - return flowFile.getLineageStartIndex(); - case FlowFileSchema.QUEUE_DATE: - return flowFile.getLastQueueDate(); - case FlowFileSchema.QUEUE_DATE_INDEX: - return flowFile.getQueueDateIndex(); - case FlowFileSchema.CONTENT_CLAIM: - final ContentClaimFieldMap contentClaimFieldMap = record.getCurrentClaim() == null ? null - : new ContentClaimFieldMap(record.getCurrentClaim(), record.getCurrentClaimOffset(), contentClaimSchema); - return contentClaimFieldMap; - case RepositoryRecordSchema.QUEUE_IDENTIFIER: - final FlowFileQueue queue = record.getDestination() == null ? record.getOriginalQueue() : record.getDestination(); - return queue == null ? null : queue.getIdentifier(); - default: - return null; - } - } - - @Override - public RecordSchema getSchema() { - return schema; - } - - @Override - public String toString() { - return "RepositoryRecordFieldMap[" + record + "]"; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java deleted file mode 100644 index db77c8b..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository.schema; - -import org.apache.nifi.repository.schema.ComplexRecordField; -import org.apache.nifi.repository.schema.FieldType; -import org.apache.nifi.repository.schema.RecordField; -import org.apache.nifi.repository.schema.RecordSchema; -import org.apache.nifi.repository.schema.Repetition; -import org.apache.nifi.repository.schema.SimpleRecordField; -import org.apache.nifi.repository.schema.UnionRecordField; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -public class RepositoryRecordSchema { - public static final String REPOSITORY_RECORD_UPDATE_V1 = "Repository Record Update"; // top level field name - public static final String REPOSITORY_RECORD_UPDATE_V2 = "Repository Record Update"; // top level field name - - // repository record fields - public static final String ACTION_TYPE = "Action"; - public static final String RECORD_ID = "Record ID"; - public static final String QUEUE_IDENTIFIER = "Queue Identifier"; - public static final String SWAP_LOCATION = "Swap Location"; - - // Update types - public static final String CREATE_OR_UPDATE_ACTION = "Create or Update"; - public static final String DELETE_ACTION = "Delete"; - public static final String SWAP_IN_ACTION = "Swap In"; - public static final String SWAP_OUT_ACTION = "Swap Out"; - - public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V1; - public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V1; - public static final RecordSchema DELETE_SCHEMA_V1; - public static final RecordSchema SWAP_IN_SCHEMA_V1; - public static final RecordSchema SWAP_OUT_SCHEMA_V1; - - public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V2; - public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V2; - public static final RecordSchema DELETE_SCHEMA_V2; - public static final RecordSchema SWAP_IN_SCHEMA_V2; - public static final RecordSchema SWAP_OUT_SCHEMA_V2; - - public static final RecordField ACTION_TYPE_FIELD = new SimpleRecordField(ACTION_TYPE, FieldType.STRING, Repetition.EXACTLY_ONE); - public static final RecordField RECORD_ID_FIELD = new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE); - - static { - // Fields for "Create" or "Update" records - final List<RecordField> createOrUpdateFields = new ArrayList<>(); - createOrUpdateFields.add(ACTION_TYPE_FIELD); - createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields()); - - createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); - createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE)); - final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields); - CREATE_OR_UPDATE_SCHEMA_V1 = new RecordSchema(createOrUpdateFields); - - // Fields for "Delete" records - final List<RecordField> deleteFields = new ArrayList<>(); - deleteFields.add(ACTION_TYPE_FIELD); - deleteFields.add(RECORD_ID_FIELD); - final ComplexRecordField delete = new ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields); - DELETE_SCHEMA_V1 = new RecordSchema(deleteFields); - - // Fields for "Swap Out" records - final List<RecordField> swapOutFields = new ArrayList<>(); - swapOutFields.add(ACTION_TYPE_FIELD); - swapOutFields.add(RECORD_ID_FIELD); - swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); - swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); - final ComplexRecordField swapOut = new ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields); - SWAP_OUT_SCHEMA_V1 = new RecordSchema(swapOutFields); - - // Fields for "Swap In" records - final List<RecordField> swapInFields = new ArrayList<>(createOrUpdateFields); - swapInFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); - final ComplexRecordField swapIn = new ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields); - SWAP_IN_SCHEMA_V1 = new RecordSchema(swapInFields); - - // Union Field that creates the top-level field type - final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V1, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn); - REPOSITORY_RECORD_SCHEMA_V1 = new RecordSchema(Collections.singletonList(repoUpdateField)); - } - - static { - // Fields for "Create" or "Update" records - final List<RecordField> createOrUpdateFields = new ArrayList<>(); - createOrUpdateFields.add(ACTION_TYPE_FIELD); - createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()); - - createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); - createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE)); - final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields); - CREATE_OR_UPDATE_SCHEMA_V2 = new RecordSchema(createOrUpdateFields); - - // Fields for "Delete" records - final List<RecordField> deleteFields = new ArrayList<>(); - deleteFields.add(ACTION_TYPE_FIELD); - deleteFields.add(RECORD_ID_FIELD); - final ComplexRecordField delete = new ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields); - DELETE_SCHEMA_V2 = new RecordSchema(deleteFields); - - // Fields for "Swap Out" records - final List<RecordField> swapOutFields = new ArrayList<>(); - swapOutFields.add(ACTION_TYPE_FIELD); - swapOutFields.add(RECORD_ID_FIELD); - swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); - swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); - final ComplexRecordField swapOut = new ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields); - SWAP_OUT_SCHEMA_V2 = new RecordSchema(swapOutFields); - - // Fields for "Swap In" records - final List<RecordField> swapInFields = new ArrayList<>(createOrUpdateFields); - swapInFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); - final ComplexRecordField swapIn = new ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields); - SWAP_IN_SCHEMA_V2 = new RecordSchema(swapInFields); - - // Union Field that creates the top-level field type - final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V2, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn); - REPOSITORY_RECORD_SCHEMA_V2 = new RecordSchema(Collections.singletonList(repoUpdateField)); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java deleted file mode 100644 index 93fa4e4..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository.schema; - -import org.apache.nifi.controller.repository.RepositoryRecordType; -import org.apache.nifi.repository.schema.NamedValue; -import org.apache.nifi.repository.schema.Record; -import org.apache.nifi.repository.schema.RecordSchema; -import org.wali.UpdateType; - -public class RepositoryRecordUpdate implements Record { - private final RecordSchema schema; - private final RepositoryRecordFieldMap fieldMap; - - public RepositoryRecordUpdate(final RepositoryRecordFieldMap fieldMap, final RecordSchema schema) { - this.schema = schema; - this.fieldMap = fieldMap; - } - - @Override - public RecordSchema getSchema() { - return schema; - } - - @Override - public Object getFieldValue(final String fieldName) { - if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2.equals(fieldName)) { - String actionType = (String) fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE); - if (RepositoryRecordType.CONTENTMISSING.name().equals(actionType)) { - actionType = RepositoryRecordType.DELETE.name(); - } - final UpdateType updateType = UpdateType.valueOf(actionType); - - final String actionName; - switch (updateType) { - case CREATE: - case UPDATE: - actionName = RepositoryRecordSchema.CREATE_OR_UPDATE_ACTION; - break; - case DELETE: - actionName = RepositoryRecordSchema.DELETE_ACTION; - break; - case SWAP_IN: - actionName = RepositoryRecordSchema.SWAP_IN_ACTION; - break; - case SWAP_OUT: - actionName = RepositoryRecordSchema.SWAP_OUT_ACTION; - break; - default: - return null; - } - - return new NamedValue(actionName, fieldMap); - } - return null; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java deleted file mode 100644 index afa19ea..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository.schema; - -import org.apache.nifi.controller.repository.claim.ResourceClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.repository.schema.Record; -import org.apache.nifi.repository.schema.RecordSchema; - -public class ResourceClaimFieldMap implements Record { - private final ResourceClaim resourceClaim; - private final RecordSchema schema; - - public ResourceClaimFieldMap(final ResourceClaim resourceClaim, final RecordSchema schema) { - this.resourceClaim = resourceClaim; - this.schema = schema; - } - - @Override - public RecordSchema getSchema() { - return schema; - } - - @Override - public Object getFieldValue(final String fieldName) { - switch (fieldName) { - case ContentClaimSchema.CLAIM_CONTAINER: - return resourceClaim.getContainer(); - case ContentClaimSchema.CLAIM_SECTION: - return resourceClaim.getSection(); - case ContentClaimSchema.CLAIM_IDENTIFIER: - return resourceClaim.getId(); - case ContentClaimSchema.LOSS_TOLERANT: - return resourceClaim.isLossTolerant(); - } - - return null; - } - - public static ResourceClaim getResourceClaim(final Record record, final ResourceClaimManager claimManager) { - final String container = (String) record.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER); - final String section = (String) record.getFieldValue(ContentClaimSchema.CLAIM_SECTION); - final String identifier = (String) record.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER); - final Boolean lossTolerant = (Boolean) record.getFieldValue(ContentClaimSchema.LOSS_TOLERANT); - - return claimManager.newResourceClaim(container, section, identifier, lossTolerant, false); - } - - @Override - public int hashCode() { - return 41 + 91 * resourceClaim.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { - return true; - } - - if (obj.getClass() != ResourceClaimFieldMap.class) { - return false; - } - - final ResourceClaimFieldMap other = (ResourceClaimFieldMap) obj; - return resourceClaim.equals(other.resourceClaim); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/pom.xml new file mode 100644 index 0000000..519b95a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-repository-models</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java new file mode 100644 index 0000000..a1d5173 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.builder.CompareToBuilder; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +/** + * <p> + * A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content. + * </p> + * + * <b>Immutable - Thread Safe</b> + * + */ +public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { + + private final long id; + private final long entryDate; + private final long lineageStartDate; + private final long lineageStartIndex; + private final long size; + private final long penaltyExpirationMs; + private final Map<String, String> attributes; + private final ContentClaim claim; + private final long claimOffset; + private final long lastQueueDate; + private final long queueDateIndex; + + private StandardFlowFileRecord(final Builder builder) { + this.id = builder.bId; + this.attributes = builder.bAttributes == null ? Collections.emptyMap() : builder.bAttributes; + this.entryDate = builder.bEntryDate; + this.lineageStartDate = builder.bLineageStartDate; + this.lineageStartIndex = builder.bLineageStartIndex; + this.penaltyExpirationMs = builder.bPenaltyExpirationMs; + this.size = builder.bSize; + this.claim = builder.bClaim; + this.claimOffset = builder.bClaimOffset; + this.lastQueueDate = builder.bLastQueueDate; + this.queueDateIndex = builder.bQueueDateIndex; + } + + @Override + public long getId() { + return id; + } + + @Override + public long getEntryDate() { + return entryDate; + } + + @Override + public long getLineageStartDate() { + return lineageStartDate; + } + + @Override + public Long getLastQueueDate() { + return lastQueueDate; + } + + @Override + public boolean isPenalized() { + return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false; + } + + @Override + public String getAttribute(final String key) { + return attributes.get(key); + } + + @Override + public long getSize() { + return size; + } + + @Override + public Map<String, String> getAttributes() { + return Collections.unmodifiableMap(this.attributes); + } + + @Override + public ContentClaim getContentClaim() { + return this.claim; + } + + @Override + public long getContentClaimOffset() { + return this.claimOffset; + } + + @Override + public long getLineageStartIndex() { + return lineageStartIndex; + } + + @Override + public long getQueueDateIndex() { + return queueDateIndex; + } + + /** + * Provides the natural ordering for FlowFile objects which is based on their identifier. + * + * @param other other + * @return standard compare contract + */ + @Override + public int compareTo(final FlowFile other) { + return new CompareToBuilder().append(id, other.getId()).toComparison(); + } + + @Override + public boolean equals(final Object other) { + if (this == other) { + return true; + } + if (!(other instanceof FlowFile)) { + return false; + } + final FlowFile otherRecord = (FlowFile) other; + return new EqualsBuilder().append(id, otherRecord.getId()).isEquals(); + } + + @Override + public String toString() { + final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE); + builder.append("uuid", getAttribute(CoreAttributes.UUID.key())); + builder.append("claim", claim == null ? "" : claim.toString()); + builder.append("offset", claimOffset); + builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size); + return builder.toString(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(7, 13).append(id).toHashCode(); + } + + public static final class Builder { + + private long bId; + private long bEntryDate = System.currentTimeMillis(); + private long bLineageStartDate = bEntryDate; + private long bLineageStartIndex = 0L; + private final Set<String> bLineageIdentifiers = new HashSet<>(); + private long bPenaltyExpirationMs = -1L; + private long bSize = 0L; + private ContentClaim bClaim = null; + private long bClaimOffset = 0L; + private long bLastQueueDate = System.currentTimeMillis(); + private long bQueueDateIndex = 0L; + private Map<String, String> bAttributes; + private boolean bAttributesCopied = false; + + public Builder id(final long id) { + bId = id; + return this; + } + + public Builder entryDate(final long epochMs) { + bEntryDate = epochMs; + return this; + } + + public Builder lineageStart(final long lineageStartDate, final long lineageStartIndex) { + bLineageStartDate = lineageStartDate; + bLineageStartIndex = lineageStartIndex; + return this; + } + + public Builder penaltyExpirationTime(final long epochMilliseconds) { + bPenaltyExpirationMs = epochMilliseconds; + return this; + } + + public Builder size(final long bytes) { + if (bytes >= 0) { + bSize = bytes; + } + return this; + } + + private Map<String, String> initializeAttributes() { + if (bAttributes == null) { + bAttributes = new HashMap<>(); + bAttributesCopied = true; + } else if (!bAttributesCopied) { + bAttributes = new HashMap<>(bAttributes); + bAttributesCopied = true; + } + + return bAttributes; + } + + public Builder addAttribute(final String key, final String value) { + if (key != null && value != null) { + initializeAttributes().put(FlowFile.KeyValidator.validateKey(key), value); + } + return this; + } + + public Builder addAttributes(final Map<String, String> attributes) { + final Map<String, String> initializedAttributes = initializeAttributes(); + + if (null != attributes) { + for (final String key : attributes.keySet()) { + FlowFile.KeyValidator.validateKey(key); + } + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + if (key != null && value != null) { + initializedAttributes.put(key, value); + } + } + } + return this; + } + + public Builder removeAttributes(final String... keys) { + if (keys != null) { + for (final String key : keys) { + if (CoreAttributes.UUID.key().equals(key)) { + continue; + } + + initializeAttributes().remove(key); + } + } + return this; + } + + public Builder removeAttributes(final Set<String> keys) { + if (keys != null) { + for (final String key : keys) { + if (CoreAttributes.UUID.key().equals(key)) { + continue; + } + + initializeAttributes().remove(key); + } + } + return this; + } + + public Builder removeAttributes(final Pattern keyPattern) { + if (keyPattern != null) { + final Iterator<String> iterator = initializeAttributes().keySet().iterator(); + while (iterator.hasNext()) { + final String key = iterator.next(); + + if (CoreAttributes.UUID.key().equals(key)) { + continue; + } + + if (keyPattern.matcher(key).matches()) { + iterator.remove(); + } + } + } + return this; + } + + public Builder contentClaim(final ContentClaim claim) { + this.bClaim = claim; + return this; + } + + public Builder contentClaimOffset(final long offset) { + this.bClaimOffset = offset; + return this; + } + + public Builder lastQueued(final long lastQueueDate, final long queueDateIndex) { + this.bLastQueueDate = lastQueueDate; + this.bQueueDateIndex = queueDateIndex; + return this; + } + + public Builder fromFlowFile(final FlowFileRecord specFlowFile) { + if (specFlowFile == null) { + return this; + } + bId = specFlowFile.getId(); + bEntryDate = specFlowFile.getEntryDate(); + bLineageStartDate = specFlowFile.getLineageStartDate(); + bLineageStartIndex = specFlowFile.getLineageStartIndex(); + bLineageIdentifiers.clear(); + bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis(); + bSize = specFlowFile.getSize(); + bAttributes = specFlowFile.getAttributes(); + bAttributesCopied = false; + bClaim = specFlowFile.getContentClaim(); + bClaimOffset = specFlowFile.getContentClaimOffset(); + bLastQueueDate = specFlowFile.getLastQueueDate(); + bQueueDateIndex = specFlowFile.getQueueDateIndex(); + + return this; + } + + public FlowFileRecord build() { + return new StandardFlowFileRecord(this); + } + } + + @Override + public long getPenaltyExpirationMillis() { + return penaltyExpirationMs; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java new file mode 100644 index 0000000..8aa1caf --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.processor.Relationship; + +public class StandardRepositoryRecord implements RepositoryRecord { + + private RepositoryRecordType type = null; + private FlowFileRecord workingFlowFileRecord = null; + private Relationship transferRelationship = null; + private FlowFileQueue destination = null; + private final FlowFileRecord originalFlowFileRecord; + private final FlowFileQueue originalQueue; + private String swapLocation; + private final Map<String, String> updatedAttributes = new HashMap<>(); + private final Map<String, String> originalAttributes; + private List<ContentClaim> transientClaims; + + /** + * Creates a new record which has no original claim or flow file - it is entirely new + * + * @param originalQueue queue + */ + public StandardRepositoryRecord(final FlowFileQueue originalQueue) { + this(originalQueue, null); + this.type = RepositoryRecordType.CREATE; + } + + /** + * Creates a record based on given original items + * + * @param originalQueue queue + * @param originalFlowFileRecord record + */ + public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord) { + this(originalQueue, originalFlowFileRecord, null); + this.type = RepositoryRecordType.UPDATE; + } + + public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord, final String swapLocation) { + this.originalQueue = originalQueue; + this.originalFlowFileRecord = originalFlowFileRecord; + this.type = RepositoryRecordType.SWAP_OUT; + this.swapLocation = swapLocation; + this.originalAttributes = originalFlowFileRecord == null ? Collections.<String, String>emptyMap() : originalFlowFileRecord.getAttributes(); + } + + @Override + public FlowFileQueue getDestination() { + return destination; + } + + public void setDestination(final FlowFileQueue destination) { + this.destination = destination; + } + + @Override + public RepositoryRecordType getType() { + return type; + } + + FlowFileRecord getOriginal() { + return originalFlowFileRecord; + } + + @Override + public String getSwapLocation() { + return swapLocation; + } + + public void setSwapLocation(final String swapLocation) { + this.swapLocation = swapLocation; + if (type != RepositoryRecordType.SWAP_OUT) { + type = RepositoryRecordType.SWAP_IN; // we are swapping in a new record + } + } + + @Override + public ContentClaim getOriginalClaim() { + return (originalFlowFileRecord == null) ? null : originalFlowFileRecord.getContentClaim(); + } + + @Override + public FlowFileQueue getOriginalQueue() { + return originalQueue; + } + + public void setWorking(final FlowFileRecord flowFile) { + workingFlowFileRecord = flowFile; + } + + public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) { + workingFlowFileRecord = flowFile; + + // If setting attribute to same value as original, don't add to updated attributes + final String currentValue = originalAttributes.get(attributeKey); + if (currentValue == null || !currentValue.equals(attributeValue)) { + updatedAttributes.put(attributeKey, attributeValue); + } + } + + public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs) { + workingFlowFileRecord = flowFile; + + for (final Map.Entry<String, String> entry : updatedAttribs.entrySet()) { + final String currentValue = originalAttributes.get(entry.getKey()); + if (currentValue == null || !currentValue.equals(entry.getValue())) { + updatedAttributes.put(entry.getKey(), entry.getValue()); + } + } + } + + @Override + public boolean isAttributesChanged() { + return !updatedAttributes.isEmpty(); + } + + public void markForAbort() { + type = RepositoryRecordType.CONTENTMISSING; + } + + @Override + public boolean isMarkedForAbort() { + return RepositoryRecordType.CONTENTMISSING.equals(type); + } + + public void markForDelete() { + type = RepositoryRecordType.DELETE; + } + + public boolean isMarkedForDelete() { + return RepositoryRecordType.DELETE.equals(type); + } + + public void setTransferRelationship(final Relationship relationship) { + transferRelationship = relationship; + } + + public Relationship getTransferRelationship() { + return transferRelationship; + } + + FlowFileRecord getWorking() { + return workingFlowFileRecord; + } + + ContentClaim getWorkingClaim() { + return (workingFlowFileRecord == null) ? null : workingFlowFileRecord.getContentClaim(); + } + + @Override + public FlowFileRecord getCurrent() { + return (workingFlowFileRecord == null) ? originalFlowFileRecord : workingFlowFileRecord; + } + + @Override + public ContentClaim getCurrentClaim() { + return (getCurrent() == null) ? null : getCurrent().getContentClaim(); + } + + @Override + public long getCurrentClaimOffset() { + return (getCurrent() == null) ? 0L : getCurrent().getContentClaimOffset(); + } + + boolean isWorking() { + return (workingFlowFileRecord != null); + } + + Map<String, String> getOriginalAttributes() { + return originalAttributes; + } + + Map<String, String> getUpdatedAttributes() { + return updatedAttributes; + } + + @Override + public String toString() { + return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]"; + } + + @Override + public List<ContentClaim> getTransientClaims() { + return transientClaims == null ? Collections.<ContentClaim> emptyList() : Collections.unmodifiableList(transientClaims); + } + + void addTransientClaim(final ContentClaim claim) { + if (claim == null) { + return; + } + + if (transientClaims == null) { + transientClaims = new ArrayList<>(); + } + transientClaims.add(claim); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java new file mode 100644 index 0000000..39a2591 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + + +/** + * <p> + * A ContentClaim is a reference to a given flow file's content. Multiple flow files may reference the same content by both having the same content claim.</p> + * + * <p> + * Must be thread safe</p> + * + */ +public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> { + + private final ResourceClaim resourceClaim; + private final long offset; + private volatile long length; + + public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) { + this.resourceClaim = resourceClaim; + this.offset = offset; + this.length = -1L; + } + + public void setLength(final long length) { + this.length = length; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result; + result = prime * result + (int) (offset ^ offset >>> 32); + result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (!(obj instanceof ContentClaim)) { + return false; + } + + final ContentClaim other = (ContentClaim) obj; + if (offset != other.getOffset()) { + return false; + } + + return resourceClaim.equals(other.getResourceClaim()); + } + + @Override + public int compareTo(final ContentClaim o) { + final int resourceComp = resourceClaim.compareTo(o.getResourceClaim()); + if (resourceComp != 0) { + return resourceComp; + } + + return Long.compare(offset, o.getOffset()); + } + + @Override + public ResourceClaim getResourceClaim() { + return resourceClaim; + } + + @Override + public long getOffset() { + return offset; + } + + @Override + public long getLength() { + return length; + } + + @Override + public String toString() { + return "StandardContentClaim [resourceClaim=" + resourceClaim + ", offset=" + offset + ", length=" + length + "]"; + } +}
