NIFI-744: Refactored ContentClaim into ContentClaim and ResourceClaim and allowed resource claim to be reused across sessions
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/68d94cc0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/68d94cc0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/68d94cc0 Branch: refs/heads/master Commit: 68d94cc01b59187e71f32624c5251d332d39e8ab Parents: 706edeb Author: Mark Payne <[email protected]> Authored: Tue Aug 18 19:55:33 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Fri Aug 21 11:08:34 2015 -0400 ---------------------------------------------------------------------- .../repository/ContentRepository.java | 14 +- .../repository/FlowFileRepository.java | 4 +- .../repository/FlowFileSwapManager.java | 6 +- .../repository/claim/ContentClaim.java | 31 +- .../repository/claim/ContentClaimManager.java | 142 ----- .../repository/claim/ResourceClaim.java | 54 ++ .../repository/claim/ResourceClaimManager.java | 135 +++++ .../stream/io/ByteCountingOutputStream.java | 4 + .../SynchronizedByteCountingOutputStream.java | 66 +++ .../nifi-framework/nifi-framework-core/pom.xml | 1 + .../nifi/controller/FileSystemSwapManager.java | 63 ++- .../apache/nifi/controller/FlowController.java | 168 +++--- .../repository/FileSystemRepository.java | 563 ++++++++++++++----- .../repository/StandardFlowFileRecord.java | 6 +- .../repository/StandardProcessSession.java | 389 ++++--------- .../repository/VolatileContentRepository.java | 49 +- .../repository/VolatileFlowFileRepository.java | 46 +- .../WriteAheadFlowFileRepository.java | 101 +++- .../repository/claim/StandardContentClaim.java | 129 ++--- .../claim/StandardContentClaimManager.java | 145 ----- .../repository/claim/StandardResourceClaim.java | 134 +++++ .../claim/StandardResourceClaimManager.java | 145 +++++ .../controller/TestFileSystemSwapManager.java | 24 +- .../repository/TestFileSystemRepository.java | 116 +++- .../repository/TestStandardProcessSession.java | 378 ++++--------- .../TestVolatileContentRepository.java | 17 +- .../TestWriteAheadFlowFileRepository.java | 6 +- 27 files changed, 1645 insertions(+), 1291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java index ee3ead9..8d0bdb3 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java @@ -24,7 +24,7 @@ import java.util.Collection; import java.util.Set; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; /** * Defines the capabilities of a content repository. Append options are not @@ -42,7 +42,7 @@ public interface ContentRepository { * @param claimManager to handle claims * @throws java.io.IOException if unable to init */ - void initialize(ContentClaimManager claimManager) throws IOException; + void initialize(ResourceClaimManager claimManager) throws IOException; /** * Shuts down the Content Repository, freeing any resources that may be @@ -173,9 +173,13 @@ public interface ContentRepository { * @param content to import from * @param claim the claim to write imported content to * @param append if true, the content will be appended to the claim; if - * false, the content will replace the contents of the claim + * false, the content will replace the contents of the claim * @throws IOException if unable to read content + * + * @deprecated if needing to append to a content claim, the contents of the claim should be + * copied to a new claim and then the data to append should be written to that new claim. */ + @Deprecated long importFrom(Path content, ContentClaim claim, boolean append) throws IOException; /** @@ -198,7 +202,11 @@ public interface ContentRepository { * @param append whether to append or replace * @return length of data imported in bytes * @throws IOException if failure to read or write stream + * + * @deprecated if needing to append to a content claim, the contents of the claim should be + * copied to a new claim and then the data to append should be written to that new claim. */ + @Deprecated long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java index 5e59e04..58fc6b3 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java @@ -22,7 +22,7 @@ import java.util.Collection; import java.util.List; import org.apache.nifi.controller.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; /** * Implementations must be thread safe @@ -38,7 +38,7 @@ public interface FlowFileRepository extends Closeable { * @param claimManager for handling claims * @throws java.io.IOException if unable to initialize repository */ - void initialize(ContentClaimManager claimManager) throws IOException; + void initialize(ResourceClaimManager claimManager) throws IOException; /** * @return the maximum number of bytes that can be stored in the underlying http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java index 869e2b3..2e5be11 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.controller.repository; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.events.EventReporter; /** @@ -38,7 +38,7 @@ public interface FlowFileSwapManager { * @param reporter the EventReporter that can be used for notifying users of * important events */ - void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter); + void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter); /** * Shuts down the manager @@ -59,5 +59,5 @@ public interface FlowFileSwapManager { * @param claimManager manager * @return how many flowfiles have been recovered */ - long recoverSwappedFlowFiles(QueueProvider connectionProvider, ContentClaimManager claimManager); + long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager); } http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java index 53cc44f..5c1d76b 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java @@ -18,35 +18,30 @@ package org.apache.nifi.controller.repository.claim; /** * <p> - * A ContentClaim is a reference to a given flow file's content. Multiple flow - * files may reference the same content by both having the same content - * claim.</p> + * A reference to a section of a {@link ResourceClaim}, which may or may not encompass + * the entire ResourceClaim. Multiple FlowFiles may reference the same content by both + * having the same content claim. + * </p> * * <p> - * Must be thread safe</p> - * + * Must be thread safe + * </p> */ public interface ContentClaim extends Comparable<ContentClaim> { /** - * @return the unique identifier for this claim - */ - String getId(); - - /** - * @return the container identifier in which this claim is held + * @return the ResourceClaim that this ContentClaim references */ - String getContainer(); + ResourceClaim getResourceClaim(); /** - * @return the section within a given container the claim is held + * @return the offset into the ResourceClaim where the content for this + * claim begins */ - String getSection(); + long getOffset(); /** - * @return Indicates whether or not the Claim is loss-tolerant. If so, we will - * attempt to keep the content but will not sacrifice a great deal of - * performance to do so + * @return the length of this ContentClaim */ - boolean isLossTolerant(); + long getLength(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java deleted file mode 100644 index bffcec3..0000000 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository.claim; - -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -/** - * Responsible for managing all ContentClaims that are used in the application - */ -public interface ContentClaimManager { - - /** - * Creates a new Content Claim with the given id, container, section, and - * loss tolerance. - * - * @param id of claim - * @param container of claim - * @param section of claim - * @param lossTolerant of claim - * @return new claim - */ - ContentClaim newContentClaim(String container, String section, String id, boolean lossTolerant); - - /** - * @param claim to obtain reference count for - * @return the number of FlowFiles that hold a claim to a particular piece - * of FlowFile content - */ - int getClaimantCount(ContentClaim claim); - - /** - * Decreases by 1 the count of how many FlowFiles hold a claim to a - * particular piece of FlowFile content and returns the new count - * - * @param claim to decrement claimants on - * @return new claimaint count - */ - int decrementClaimantCount(ContentClaim claim); - - /** - * Increases by 1 the count of how many FlowFiles hold a claim to a - * particular piece of FlowFile content and returns the new count - * - * @param claim to increment claims on - * @return new claimant count - */ - int incrementClaimantCount(ContentClaim claim); - - /** - * Increases by 1 the count of how many FlowFiles hold a claim to a - * particular piece of FlowFile content and returns the new count. - * - * If it is known that the Content Claim whose count is being incremented is - * a newly created ContentClaim, this method should be called with a value - * of {@code true} as the second argument, as it may allow the manager to - * optimize its tasks, knowing that the Content Claim cannot be referenced - * by any other component - * - * @param claim to increment - * @param newClaim provides a hint that no other process can have access to this - * claim right now - * @return new claim count - */ - int incrementClaimantCount(ContentClaim claim, boolean newClaim); - - /** - * Indicates that the given ContentClaim can now be destroyed by the - * appropriate Content Repository. This should be done only after it is - * guaranteed that the FlowFile Repository has been synchronized with its - * underlying storage component. This way, we avoid the following sequence - * of events: - * <ul> - * <li>FlowFile Repository is updated to indicate that FlowFile F no longer - * depends on ContentClaim C</li> - * <li>ContentClaim C is no longer needed and is destroyed</li> - * <li>The Operating System crashes or there is a power failure</li> - * <li>Upon restart, the FlowFile Repository was not synchronized with its - * underlying storage mechanism and as such indicates that FlowFile F needs - * ContentClaim C.</li> - * <li>Since ContentClaim C has already been destroyed, it is inaccessible, - * and FlowFile F's Content is not found, so the FlowFile is removed, - * resulting in data loss.</li> - * </ul> - * - * <p> - * Using this method of marking the ContentClaim as destructable only when - * the FlowFile repository has been synced with the underlying storage - * mechanism, we can ensure that on restart, we will not point to this - * unneeded claim. As such, it is now safe to destroy the contents. - * </p> - * - * @param claim to mark as now destructable - */ - void markDestructable(ContentClaim claim); - - /** - * Drains up to {@code maxElements} Content Claims from the internal queue - * of destructable content claims to the given {@code destination} so that - * they can be destroyed. - * - * @param destination to drain to - * @param maxElements max items to drain - */ - void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements); - - /** - * Drains up to {@code maxElements} Content Claims from the internal queue - * of destructable content claims to the given {@code destination} so that - * they can be destroyed. If no ContentClaim is ready to be destroyed at - * this time, will wait up to the specified amount of time before returning. - * If, after the specified amount of time, there is still no ContentClaim - * ready to be destroyed, the method will return without having added - * anything to the given {@code destination}. - * - * @param destination to drain to - * @param maxElements max items to drain - * @param timeout maximum time to wait - * @param unit unit of time to wait - */ - void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit); - - /** - * Clears the manager's memory of any and all ContentClaims that it knows - * about - */ - void purge(); -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java new file mode 100644 index 0000000..d448632 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + +import org.apache.nifi.controller.repository.ContentRepository; + +/** + * <p> + * Represents a resource that can be provided by a {@link ContentRepository} + * </p> + * + * <p> + * MUST BE THREAD-SAFE! + * </p> + */ +public interface ResourceClaim extends Comparable<ResourceClaim> { + + /** + * @return the unique identifier for this claim + */ + String getId(); + + /** + * @return the container identifier in which this claim is held + */ + String getContainer(); + + /** + * @return the section within a given container the claim is held + */ + String getSection(); + + /** + * @return Indicates whether or not the Claim is loss-tolerant. If so, we will + * attempt to keep the content but will not sacrifice a great deal of + * performance to do so + */ + boolean isLossTolerant(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java new file mode 100644 index 0000000..01f4c65 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +/** + * Responsible for managing all ResourceClaims that are used in the application + */ +public interface ResourceClaimManager { + + /** + * Creates a new Resource Claim with the given id, container, section, and + * loss tolerance. + * + * @param id of claim + * @param container of claim + * @param section of claim + * @param lossTolerant of claim + * @return new claim + */ + ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant); + + /** + * @param claim to obtain reference count for + * @return the number of FlowFiles that hold a claim to a particular piece + * of FlowFile content + */ + int getClaimantCount(ResourceClaim claim); + + /** + * Decreases by 1 the count of how many FlowFiles hold a claim to a + * particular piece of FlowFile content and returns the new count + * + * @param claim to decrement claimants on + * @return new claimaint count + */ + int decrementClaimantCount(ResourceClaim claim); + + /** + * Increases by 1 the count of how many FlowFiles hold a claim to a + * particular piece of FlowFile content and returns the new count + * + * @param claim to increment claims on + * @return new claimant count + */ + int incrementClaimantCount(ResourceClaim claim); + + /** + * Increases by 1 the count of how many FlowFiles hold a claim to a + * particular piece of FlowFile content and returns the new count. + * + * If it is known that the Content Claim whose count is being incremented is + * a newly created ResourceClaim, this method should be called with a value + * of {@code true} as the second argument, as it may allow the manager to + * optimize its tasks, knowing that the Content Claim cannot be referenced + * by any other component + * + * @param claim to increment + * @param newClaim provides a hint that no other process can have access to this + * claim right now + * @return new claim count + */ + int incrementClaimantCount(ResourceClaim claim, boolean newClaim); + + /** + * Indicates that the given ResourceClaim can now be destroyed by the + * appropriate Content Repository. This should be done only after it is + * guaranteed that the FlowFile Repository has been synchronized with its + * underlying storage component. This way, we avoid the following sequence + * of events: + * <ul> + * <li>FlowFile Repository is updated to indicate that FlowFile F no longer depends on ResourceClaim C</li> + * <li>ResourceClaim C is no longer needed and is destroyed</li> + * <li>The Operating System crashes or there is a power failure</li> + * <li>Upon restart, the FlowFile Repository was not synchronized with its underlying storage mechanism and as such indicates that FlowFile F needs ResourceClaim C.</li> + * <li>Since ResourceClaim C has already been destroyed, it is inaccessible, and FlowFile F's Content is not found, so the FlowFile is removed, resulting in data loss.</li> + * </ul> + * + * <p> + * Using this method of marking the ResourceClaim as destructable only when the FlowFile repository has been synced with the underlying storage mechanism, we can ensure that on restart, we will + * not point to this unneeded claim. As such, it is now safe to destroy the contents. + * </p> + * + * @param claim to mark as now destructable + */ + void markDestructable(ResourceClaim claim); + + /** + * Drains up to {@code maxElements} Content Claims from the internal queue + * of destructable content claims to the given {@code destination} so that + * they can be destroyed. + * + * @param destination to drain to + * @param maxElements max items to drain + */ + void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements); + + /** + * Drains up to {@code maxElements} Content Claims from the internal queue + * of destructable content claims to the given {@code destination} so that + * they can be destroyed. If no ResourceClaim is ready to be destroyed at + * this time, will wait up to the specified amount of time before returning. + * If, after the specified amount of time, there is still no ResourceClaim + * ready to be destroyed, the method will return without having added + * anything to the given {@code destination}. + * + * @param destination to drain to + * @param maxElements max items to drain + * @param timeout maximum time to wait + * @param unit unit of time to wait + */ + void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit); + + /** + * Clears the manager's memory of any and all ResourceClaims that it knows + * about + */ + void purge(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java index 9bbd45e..47f236d 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java @@ -63,4 +63,8 @@ public class ByteCountingOutputStream extends OutputStream { public void close() throws IOException { out.close(); } + + public OutputStream getWrappedStream() { + return out; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java new file mode 100644 index 0000000..e617829 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/SynchronizedByteCountingOutputStream.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import java.io.IOException; +import java.io.OutputStream; + +public class SynchronizedByteCountingOutputStream extends ByteCountingOutputStream { + + public SynchronizedByteCountingOutputStream(final OutputStream out) { + super(out); + } + + public SynchronizedByteCountingOutputStream(final OutputStream out, final long byteCount) { + super(out, byteCount); + } + + @Override + public synchronized void flush() throws IOException { + super.flush(); + } + + @Override + public synchronized void close() throws IOException { + super.close(); + } + + @Override + public synchronized long getBytesWritten() { + return super.getBytesWritten(); + } + + @Override + public synchronized OutputStream getWrappedStream() { + return super.getWrappedStream(); + } + + @Override + public synchronized void write(final byte[] b) throws IOException { + super.write(b); + } + + @Override + public synchronized void write(final int b) throws IOException { + super.write(b); + } + + @Override + public synchronized void write(final byte[] b, final int off, final int len) throws IOException { + super.write(b, off, len); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 8d64143..f48988a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -135,6 +135,7 @@ <exclude>src/test/resources/conf/0bytes.xml</exclude> <exclude>src/test/resources/conf/termination-only.xml</exclude> <exclude>src/test/resources/hello.txt</exclude> + <exclude>src/test/resources/bye.txt</exclude> <exclude>src/test/resources/old-swap-file.swap</exclude> </excludes> </configuration> http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 604dba9..c829566 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -61,7 +61,9 @@ import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.QueueProvider; import org.apache.nifi.controller.repository.StandardFlowFileRecord; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.EventReporter; import org.apache.nifi.stream.io.BufferedOutputStream; @@ -83,7 +85,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part"); - public static final int SWAP_ENCODING_VERSION = 6; + public static final int SWAP_ENCODING_VERSION = 7; public static final String EVENT_CATEGORY = "Swap FlowFiles"; private final ScheduledExecutorService swapQueueIdentifierExecutor; @@ -98,7 +100,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private final long swapOutMillis; private final int swapOutThreadCount; - private ContentClaimManager claimManager; // effectively final + private ResourceClaimManager claimManager; // effectively final private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class); @@ -138,7 +140,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } @Override - public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) { + public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ResourceClaimManager claimManager, final EventReporter eventReporter) { this.claimManager = claimManager; this.flowFileRepository = flowFileRepository; this.eventReporter = eventReporter; @@ -184,11 +186,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager { out.writeBoolean(false); } else { out.writeBoolean(true); - out.writeUTF(claim.getId()); - out.writeUTF(claim.getContainer()); - out.writeUTF(claim.getSection()); + final ResourceClaim resourceClaim = claim.getResourceClaim(); + out.writeUTF(resourceClaim.getId()); + out.writeUTF(resourceClaim.getContainer()); + out.writeUTF(resourceClaim.getSection()); + out.writeLong(claim.getOffset()); + out.writeLong(claim.getLength()); out.writeLong(flowFile.getContentClaimOffset()); - out.writeBoolean(claim.isLossTolerant()); + out.writeBoolean(resourceClaim.isLossTolerant()); } final Map<String, String> attributes = flowFile.getAttributes(); @@ -226,7 +231,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } - static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ContentClaimManager claimManager) throws IOException { + static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException { final int swapEncodingVersion = in.readInt(); if (swapEncodingVersion > SWAP_ENCODING_VERSION) { throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " @@ -245,7 +250,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue, - final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException { + final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException { final List<FlowFileRecord> flowFiles = new ArrayList<>(); for (int i = 0; i < numFlowFiles; i++) { // legacy encoding had an "action" because it used to be couple with FlowFile Repository code @@ -292,6 +297,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final String container = in.readUTF(); final String section = in.readUTF(); + + final long resourceOffset; + final long resourceLength; + if (serializationVersion < 6) { + resourceOffset = 0L; + resourceLength = -1L; + } else { + resourceOffset = in.readLong(); + resourceLength = in.readLong(); + } + final long claimOffset = in.readLong(); final boolean lossTolerant; @@ -301,10 +317,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager { lossTolerant = false; } - final ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant); + final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant); + final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset); + claim.setLength(resourceLength); if (incrementContentClaims) { - claimManager.incrementClaimantCount(claim); + claimManager.incrementClaimantCount(resourceClaim); } ffBuilder.contentClaim(claim); @@ -353,16 +371,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager { throw new EOFException(); } if (firstValue == 0xff && secondValue == 0xff) { - int ch1 = in.read(); - int ch2 = in.read(); - int ch3 = in.read(); - int ch4 = in.read(); + final int ch1 = in.read(); + final int ch2 = in.read(); + final int ch3 = in.read(); + final int ch4 = in.read(); if ((ch1 | ch2 | ch3 | ch4) < 0) { throw new EOFException(); } - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4)); + return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4; } else { - return ((firstValue << 8) + (secondValue)); + return (firstValue << 8) + secondValue; } } @@ -422,7 +440,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final FlowFileQueue flowFileQueue = entry.getKey(); // if queue is more than 60% of its swap threshold, don't swap flowfiles in - if (flowFileQueue.unswappedSize() >= ((float) flowFileQueue.getSwapThreshold() * 0.6F)) { + if (flowFileQueue.unswappedSize() >= flowFileQueue.getSwapThreshold() * 0.6F) { continue; } @@ -432,7 +450,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final Queue<File> queue = queueLockWrapper.getQueue(); // Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files. - while (flowFileQueue.unswappedSize() < ((float) flowFileQueue.getSwapThreshold() * 0.9F)) { + while (flowFileQueue.unswappedSize() < flowFileQueue.getSwapThreshold() * 0.9F) { File swapFile = null; try { swapFile = queue.poll(); @@ -545,7 +563,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { QueueLockWrapper swapQueue = swapMap.get(flowFileQueue); if (swapQueue == null) { swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>()); - QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue); + final QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue); if (oldQueue != null) { swapQueue = oldQueue; } @@ -567,7 +585,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { * @return the largest FlowFile ID that was recovered */ @Override - public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) { + public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ResourceClaimManager claimManager) { final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { @Override public boolean accept(final File dir, final String name) { @@ -680,6 +698,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } + @Override public void shutdown() { swapQueueIdentifierExecutor.shutdownNow(); swapInExecutor.shutdownNow(); http://git-wip-us.apache.org/repos/asf/nifi/blob/68d94cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 3d78b3a..af99d50 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -98,9 +98,12 @@ import org.apache.nifi.controller.repository.StandardCounterRepository; import org.apache.nifi.controller.repository.StandardFlowFileRecord; import org.apache.nifi.controller.repository.StandardRepositoryRecord; import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; import org.apache.nifi.controller.repository.claim.ContentDirection; -import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaim; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent; import org.apache.nifi.controller.scheduling.ProcessContextFactory; @@ -176,7 +179,6 @@ import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; @@ -282,7 +284,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final NodeProtocolSender protocolSender; private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks"); - private final ContentClaimManager contentClaimManager = new StandardContentClaimManager(); + private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager(); // guarded by rwLock /** @@ -495,7 +497,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false)); } - private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) { + private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) { final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION); if (implementationClassName == null) { throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: " @@ -3108,7 +3110,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public boolean isInputAvailable() { try { - return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier())); + return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), + event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset())); } catch (final IOException e) { return false; } @@ -3117,43 +3120,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public boolean isOutputAvailable() { try { - return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier())); + return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), + event.getContentClaimIdentifier(), event.getContentClaimOffset())); } catch (final IOException e) { return false; } } - private ContentClaim createClaim(final String container, final String section, final String identifier) { + private ContentClaim createClaim(final String container, final String section, final String identifier, final Long offset) { if (container == null || section == null || identifier == null) { return null; } - return new ContentClaim() { - @Override - public int compareTo(final ContentClaim o) { - return 0; - } - - @Override - public String getId() { - return identifier; - } - - @Override - public String getContainer() { - return container; - } - - @Override - public String getSection() { - return section; - } - - @Override - public boolean isLossTolerant() { - return false; - } - }; + final StandardResourceClaim resourceClaim = new StandardResourceClaim(container, section, identifier, false); + return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue()); } @Override @@ -3170,45 +3150,48 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R requireNonNull(requestUri); final ContentClaim claim; - final Long offset; final long size; + final long offset; if (direction == ContentDirection.INPUT) { if (provEvent.getPreviousContentClaimContainer() == null || provEvent.getPreviousContentClaimSection() == null || provEvent.getPreviousContentClaimIdentifier() == null) { throw new IllegalArgumentException("Input Content Claim not specified"); } - claim = contentClaimManager.newContentClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), provEvent.getPreviousContentClaimIdentifier(), false); - offset = provEvent.getPreviousContentClaimOffset(); + final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), + provEvent.getPreviousContentClaimIdentifier(), false); + claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset()); + offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset(); size = provEvent.getPreviousFileSize(); } else { if (provEvent.getContentClaimContainer() == null || provEvent.getContentClaimSection() == null || provEvent.getContentClaimIdentifier() == null) { throw new IllegalArgumentException("Output Content Claim not specified"); } - claim = contentClaimManager.newContentClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), provEvent.getContentClaimIdentifier(), false); - offset = provEvent.getContentClaimOffset(); + final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), + provEvent.getContentClaimIdentifier(), false); + + claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset()); + offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset(); size = provEvent.getFileSize(); } final InputStream rawStream = contentRepository.read(claim); - if (offset != null) { - StreamUtils.skip(rawStream, offset.longValue()); - } + final ResourceClaim resourceClaim = claim.getResourceClaim(); // Register a Provenance Event to indicate that we replayed the data. final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder() - .setEventType(ProvenanceEventType.SEND) - .setFlowFileUUID(provEvent.getFlowFileUuid()) - .setAttributes(provEvent.getAttributes(), Collections.<String, String>emptyMap()) - .setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), offset, size) - .setTransitUri(requestUri) - .setEventTime(System.currentTimeMillis()) - .setFlowFileEntryDate(provEvent.getFlowFileEntryDate()) - .setLineageStartDate(provEvent.getLineageStartDate()) - .setComponentType(getName()) - .setComponentId(getRootGroupId()) - .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId()) - .build(); + .setEventType(ProvenanceEventType.SEND) + .setFlowFileUUID(provEvent.getFlowFileUuid()) + .setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap()) + .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size) + .setTransitUri(requestUri) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(provEvent.getFlowFileEntryDate()) + .setLineageStartDate(provEvent.getLineageStartDate()) + .setComponentType(getName()) + .setComponentId(getRootGroupId()) + .setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId()) + .build(); provenanceEventRepository.registerEvent(sendEvent); @@ -3233,7 +3216,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - if (!contentRepository.isAccessible(contentClaimManager.newContentClaim(contentClaimContainer, contentClaimSection, contentClaimId, false))) { + final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false); + final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset()); + + if (!contentRepository.isAccessible(contentClaim)) { return "Content is no longer available in Content Repository"; } } catch (final IOException ioe) { @@ -3310,18 +3296,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // Create the ContentClaim - final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(), + final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); // Increment Claimant Count, since we will now be referencing the Content Claim - contentClaimManager.incrementClaimantCount(claim); + contentClaimManager.incrementClaimantCount(resourceClaim); + final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue(); + final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset); + contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize()); - if (!contentRepository.isAccessible(claim)) { - contentClaimManager.decrementClaimantCount(claim); + if (!contentRepository.isAccessible(contentClaim)) { + contentClaimManager.decrementClaimantCount(resourceClaim); throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository"); } - final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue(); final String parentUUID = event.getFlowFileUuid(); // Create the FlowFile Record @@ -3331,39 +3319,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final String newFlowFileUUID = UUID.randomUUID().toString(); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - // Copy relevant info from source FlowFile - .addAttributes(event.getPreviousAttributes()) - .contentClaim(claim) - .contentClaimOffset(claimOffset) - .entryDate(System.currentTimeMillis()) - .id(flowFileRepository.getNextFlowFileSequence()) - .lineageIdentifiers(lineageIdentifiers) - .lineageStartDate(event.getLineageStartDate()) - .size(contentSize.longValue()) - // Create a new UUID and add attributes indicating that this is a replay - .addAttribute("flowfile.replay", "true") - .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date())) - .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID) - // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile - .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key()) - // build the record - .build(); + // Copy relevant info from source FlowFile + .addAttributes(event.getPreviousAttributes()) + .contentClaim(contentClaim) + .contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself + .entryDate(System.currentTimeMillis()) + .id(flowFileRepository.getNextFlowFileSequence()) + .lineageIdentifiers(lineageIdentifiers) + .lineageStartDate(event.getLineageStartDate()) + .size(contentSize.longValue()) + // Create a new UUID and add attributes indicating that this is a replay + .addAttribute("flowfile.replay", "true") + .addAttribute("flowfile.replay.timestamp", String.valueOf(new Date())) + .addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID) + // remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile + .removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key()) + // build the record + .build(); // Register a Provenance Event to indicate that we replayed the data. final ProvenanceEventRecord replayEvent = new StandardProvenanceEventRecord.Builder() - .setEventType(ProvenanceEventType.REPLAY) - .addChildUuid(newFlowFileUUID) - .addParentUuid(parentUUID) - .setFlowFileUUID(parentUUID) - .setAttributes(Collections.<String, String>emptyMap(), flowFileRecord.getAttributes()) - .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize()) - .setDetails("Replay requested by " + requestor) - .setEventTime(System.currentTimeMillis()) - .setFlowFileEntryDate(System.currentTimeMillis()) - .setLineageStartDate(event.getLineageStartDate()) - .setComponentType(event.getComponentType()) - .setComponentId(event.getComponentId()) - .build(); + .setEventType(ProvenanceEventType.REPLAY) + .addChildUuid(newFlowFileUUID) + .addParentUuid(parentUUID) + .setFlowFileUUID(parentUUID) + .setAttributes(Collections.<String, String> emptyMap(), flowFileRecord.getAttributes()) + .setCurrentContentClaim(event.getContentClaimSection(), event.getContentClaimContainer(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize()) + .setDetails("Replay requested by " + requestor) + .setEventTime(System.currentTimeMillis()) + .setFlowFileEntryDate(System.currentTimeMillis()) + .setLineageStartDate(event.getLineageStartDate()) + .setComponentType(event.getComponentType()) + .setComponentId(event.getComponentId()) + .build(); provenanceEventRepository.registerEvent(replayEvent); // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
