http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java new file mode 100644 index 0000000..66f32d8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.SwapManagerInitializationContext; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestStandardFlowFileQueue { + private TestSwapManager swapManager = null; + private StandardFlowFileQueue queue = null; + + @Before + public void setup() { + final Connection connection = Mockito.mock(Connection.class); + Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class)); + Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); + + final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class); + swapManager = new TestSwapManager(); + + final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); + final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class); + final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); + + queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); + TestFlowFile.idGenerator.set(0L); + } + + + @Test + public void testSwapOutOccurs() { + for (int i = 0; i < 10000; i++) { + queue.put(new TestFlowFile()); + assertEquals(0, swapManager.swapOutCalledCount); + assertEquals(i + 1, queue.size().getObjectCount()); + assertEquals(i + 1, queue.size().getByteCount()); + } + + for (int i = 0; i < 9999; i++) { + queue.put(new TestFlowFile()); + assertEquals(0, swapManager.swapOutCalledCount); + assertEquals(i + 10001, queue.size().getObjectCount()); + assertEquals(i + 10001, queue.size().getByteCount()); + } + + queue.put(new TestFlowFile(1000)); + assertEquals(1, swapManager.swapOutCalledCount); + assertEquals(20000, queue.size().getObjectCount()); + assertEquals(20999, queue.size().getByteCount()); + + assertEquals(10000, queue.getActiveQueueSize().getObjectCount()); + } + + @Test + public void testLowestPrioritySwappedOutFirst() { + final List<FlowFilePrioritizer> prioritizers = new ArrayList<>(); + prioritizers.add(new FlowFileSizePrioritizer()); + queue.setPriorities(prioritizers); + + long maxSize = 20000; + for (int i = 1; i <= 20000; i++) { + queue.put(new TestFlowFile(maxSize - i)); + } + + assertEquals(1, swapManager.swapOutCalledCount); + assertEquals(20000, queue.size().getObjectCount()); + + assertEquals(10000, queue.getActiveQueueSize().getObjectCount()); + final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>()); + assertEquals(10000, flowFiles.size()); + for (int i = 0; i < 10000; i++) { + assertEquals(i, flowFiles.get(i).getSize()); + } + } + + @Test + public void testSwapIn() { + for (int i = 1; i <= 20000; i++) { + queue.put(new TestFlowFile()); + } + + assertEquals(1, swapManager.swappedOut.size()); + queue.put(new TestFlowFile()); + assertEquals(1, swapManager.swappedOut.size()); + + final Set<FlowFileRecord> exp = new HashSet<>(); + for (int i = 0; i < 9999; i++) { + assertNotNull(queue.poll(exp)); + } + + assertEquals(0, swapManager.swapInCalledCount); + assertEquals(1, queue.getActiveQueueSize().getObjectCount()); + assertNotNull(queue.poll(exp)); + + assertEquals(0, swapManager.swapInCalledCount); + assertEquals(0, queue.getActiveQueueSize().getObjectCount()); + + assertEquals(1, swapManager.swapOutCalledCount); + + assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top. + assertEquals(1, swapManager.swapInCalledCount); + assertEquals(9999, queue.getActiveQueueSize().getObjectCount()); + + assertTrue(swapManager.swappedOut.isEmpty()); + + queue.poll(exp); + + } + + + private class TestSwapManager implements FlowFileSwapManager { + private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>(); + int swapOutCalledCount = 0; + int swapInCalledCount = 0; + + + @Override + public void initialize(final SwapManagerInitializationContext initializationContext) { + + } + + @Override + public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException { + swapOutCalledCount++; + final String location = UUID.randomUUID().toString(); + swappedOut.put(location, new ArrayList<FlowFileRecord>(flowFiles)); + return location; + } + + @Override + public List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { + return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation)); + } + + @Override + public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { + swapInCalledCount++; + return swappedOut.remove(swapLocation); + } + + @Override + public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException { + return new ArrayList<String>(swappedOut.keySet()); + } + + @Override + public void dropSwappedFlowFiles(String swapLocation, final FlowFileQueue flowFileQueue, String user) { + + } + + @Override + public QueueSize getSwapSize(String swapLocation) throws IOException { + final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation); + if (flowFiles == null) { + return new QueueSize(0, 0L); + } + + int count = 0; + long size = 0L; + for (final FlowFileRecord flowFile : flowFiles) { + count++; + size += flowFile.getSize(); + } + + return new QueueSize(count, size); + } + + @Override + public Long getMaxRecordId(String swapLocation) throws IOException { + final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation); + if (flowFiles == null) { + return null; + } + + Long max = null; + for (final FlowFileRecord flowFile : flowFiles) { + if (max == null || flowFile.getId() > max) { + max = flowFile.getId(); + } + } + + return max; + } + + @Override + public void purge() { + swappedOut.clear(); + } + } + + + private static class TestFlowFile implements FlowFileRecord { + private static final AtomicLong idGenerator = new AtomicLong(0L); + + private final long id = idGenerator.getAndIncrement(); + private final long entryDate = System.currentTimeMillis(); + private final Map<String, String> attributes; + private final long size; + + public TestFlowFile() { + this(1L); + } + + public TestFlowFile(final long size) { + this(new HashMap<String, String>(), size); + } + + public TestFlowFile(final Map<String, String> attributes, final long size) { + this.attributes = attributes; + this.size = size; + } + + + @Override + public long getId() { + return id; + } + + @Override + public long getEntryDate() { + return entryDate; + } + + @Override + public long getLineageStartDate() { + return entryDate; + } + + @Override + public Long getLastQueueDate() { + return null; + } + + @Override + public Set<String> getLineageIdentifiers() { + return Collections.emptySet(); + } + + @Override + public boolean isPenalized() { + return false; + } + + @Override + public String getAttribute(String key) { + return attributes.get(key); + } + + @Override + public long getSize() { + return size; + } + + @Override + public Map<String, String> getAttributes() { + return Collections.unmodifiableMap(attributes); + } + + @Override + public int compareTo(final FlowFile o) { + return Long.compare(id, o.getId()); + } + + @Override + public long getPenaltyExpirationMillis() { + return 0; + } + + @Override + public ContentClaim getContentClaim() { + return null; + } + + @Override + public long getContentClaimOffset() { + return 0; + } + } + + private static class FlowFileSizePrioritizer implements FlowFilePrioritizer { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + return Long.compare(o1.getSize(), o2.getSize()); + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index f0a6d8a..d43a3db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -32,11 +32,14 @@ import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.events.EventReporter; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.util.NiFiProperties; /** @@ -66,7 +69,8 @@ public final class StandardConnection implements Connection { destination = new AtomicReference<>(builder.destination); relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); scheduler = builder.scheduler; - flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); + flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager, + scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } @@ -262,6 +266,9 @@ public final class StandardConnection implements Connection { private Collection<Relationship> relationships; private FlowFileSwapManager swapManager; private EventReporter eventReporter; + private FlowFileRepository flowFileRepository; + private ProvenanceEventRepository provenanceRepository; + private ResourceClaimManager resourceClaimManager; public Builder(final ProcessScheduler scheduler) { this.scheduler = scheduler; @@ -318,6 +325,21 @@ public final class StandardConnection implements Connection { return this; } + public Builder flowFileRepository(final FlowFileRepository flowFileRepository) { + this.flowFileRepository = flowFileRepository; + return this; + } + + public Builder provenanceRepository(final ProvenanceEventRepository provenanceRepository) { + this.provenanceRepository = provenanceRepository; + return this; + } + + public Builder resourceClaimManager(final ResourceClaimManager resourceClaimManager) { + this.resourceClaimManager = resourceClaimManager; + return this; + } + public StandardConnection build() { if (source == null) { throw new IllegalStateException("Cannot build a Connection without a Source"); @@ -328,6 +350,15 @@ public final class StandardConnection implements Connection { if (swapManager == null) { throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager"); } + if (flowFileRepository == null) { + throw new IllegalStateException("Cannot build a Connection without a FlowFile Repository"); + } + if (provenanceRepository == null) { + throw new IllegalStateException("Cannot build a Connection without a Provenance Repository"); + } + if (resourceClaimManager == null) { + throw new IllegalStateException("Cannot build a Connection without a Resource Claim Manager"); + } if (relationships == null) { relationships = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 7ab56ed..c4a86f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -28,6 +28,7 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; @@ -79,7 +80,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private EventReporter eventReporter; private ResourceClaimManager claimManager; - public FileSystemSwapManager() { final NiFiProperties properties = NiFiProperties.getInstance(); final Path flowFileRepoPath = properties.getFlowFileRepositoryPath(); @@ -111,6 +111,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager { try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) { serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); fos.getFD().sync(); + } catch (final IOException ioe) { + // we failed to write out the entire swap file. Delete the temporary file, if we can. + swapTempFile.delete(); + throw ioe; } if (swapTempFile.renameTo(swapFile)) { @@ -133,25 +137,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager { warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"); } - // TODO: When FlowFile Queue performs this operation, it needs to take the following error handling logic into account: - - /* - * } catch (final EOFException eof) { - * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile); - * - * if (!swapFile.delete()) { - * warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually"); - * } - * } catch (final FileNotFoundException fnfe) { - * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile); - * } catch (final Exception e) { - * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e); - * - * if (swapFile != null) { - * queue.add(swapFile); - * } - * } - */ return swappedFlowFiles; } @@ -165,7 +150,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final List<FlowFileRecord> swappedFlowFiles; try (final InputStream fis = new FileInputStream(swapFile); final DataInputStream in = new DataInputStream(fis)) { - swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, swapLocation, claimManager); + swappedFlowFiles = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager); } return swappedFlowFiles; @@ -189,6 +174,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } @Override + public void dropSwappedFlowFiles(final String swapLocation, final FlowFileQueue flowFileQueue, final String user) throws IOException { + + } + + + @Override public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException { final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { @Override @@ -322,7 +313,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } - public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException { + public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException { if (toSwap == null || toSwap.isEmpty()) { return 0; } @@ -396,8 +387,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager { return toSwap.size(); } - private void writeString(final String toWrite, final OutputStream out) throws IOException { - final byte[] bytes = toWrite.getBytes("UTF-8"); + private static void writeString(final String toWrite, final OutputStream out) throws IOException { + final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8); final int utflen = bytes.length; if (utflen < 65535) { @@ -415,26 +406,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } - static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final String swapLocation, final ResourceClaimManager claimManager) throws IOException { + static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException { final int swapEncodingVersion = in.readInt(); if (swapEncodingVersion > SWAP_ENCODING_VERSION) { throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); } - final String connectionId = in.readUTF(); + final String connectionId = in.readUTF(); // Connection ID if (!connectionId.equals(queue.getIdentifier())) { - throw new IllegalArgumentException("Cannot restore contents from FlowFile Swap File " + swapLocation + - " because the file indicates that records belong to Connection with ID " + connectionId + " but attempted to swap those records into " + queue); + throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation + + " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier()); } final int numRecords = in.readInt(); in.readLong(); // Content Size + if (swapEncodingVersion > 7) { + in.readLong(); // Max Record ID + } return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager); } - static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, + private static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException { final List<FlowFileRecord> flowFiles = new ArrayList<>(); for (int i = 0; i < numFlowFiles; i++) { @@ -543,7 +537,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } final byte[] bytes = new byte[numBytes]; fillBuffer(in, bytes, numBytes); - return new String(bytes, "UTF-8"); + return new String(bytes, StandardCharsets.UTF_8); } private static Integer readFieldLength(final InputStream in) throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 23746ce..20f2642 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -286,7 +286,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final NodeProtocolSender protocolSender; private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks"); - private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager(); + private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager(); // guarded by rwLock /** @@ -393,7 +393,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process")); - final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager); + final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, resourceClaimManager); flowFileRepository = flowFileRepo; flowFileEventRepository = flowFileEventRepo; counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); @@ -668,7 +668,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class); synchronized (contentRepo) { - contentRepo.initialize(contentClaimManager); + contentRepo.initialize(resourceClaimManager); } return contentRepo; } catch (final Exception e) { @@ -728,11 +728,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Create and initialize a FlowFileSwapManager for this connection final FlowFileSwapManager swapManager = createSwapManager(properties); final EventReporter eventReporter = createEventReporter(getBulletinRepository()); + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() { @Override public ResourceClaimManager getResourceClaimManager() { - return getResourceClaimManager(); + return resourceClaimManager; } @Override @@ -756,6 +757,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R .destination(destination) .swapManager(swapManager) .eventReporter(eventReporter) + .resourceClaimManager(resourceClaimManager) + .flowFileRepository(flowFileRepository) + .provenanceRepository(provenanceEventRepository) .build(); } @@ -3188,7 +3192,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new IllegalArgumentException("Input Content Claim not specified"); } - final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), provEvent.getPreviousContentClaimIdentifier(), false); claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset()); offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset(); @@ -3198,7 +3202,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new IllegalArgumentException("Output Content Claim not specified"); } - final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), provEvent.getContentClaimIdentifier(), false); claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset()); @@ -3247,7 +3251,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset()); if (!contentRepository.isAccessible(contentClaim)) { @@ -3327,17 +3331,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // Create the ContentClaim - final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); // Increment Claimant Count, since we will now be referencing the Content Claim - contentClaimManager.incrementClaimantCount(resourceClaim); + resourceClaimManager.incrementClaimantCount(resourceClaim); final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue(); final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset); contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize()); if (!contentRepository.isAccessible(contentClaim)) { - contentClaimManager.decrementClaimantCount(resourceClaim); + resourceClaimManager.decrementClaimantCount(resourceClaim); throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index a32a485..cfbb770 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -81,9 +81,11 @@ import org.slf4j.LoggerFactory; * <p> * Provides a ProcessSession that ensures all accesses, changes and transfers * occur in an atomic manner for all FlowFiles including their contents and - * attributes</p> + * attributes + * </p> * <p> - * NOT THREAD SAFE</p> + * NOT THREAD SAFE + * </p> * <p/> */ public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher { @@ -104,7 +106,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Map<String, Long> globalCounters = new HashMap<>(); private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>(); private final ProcessContext context; - private final Set<FlowFile> recursionSet = new HashSet<>();//set used to track what is currently being operated on to prevent logic failures if recursive calls occurring + private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring private final Set<Path> deleteOnCommit = new HashSet<>(); private final long sessionId; private final String connectableDescription; @@ -114,7 +116,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final StandardProvenanceReporter provenanceReporter; - private int removedCount = 0; // number of flowfiles removed in this session + private int removedCount = 0; // number of flowfiles removed in this session private long removedBytes = 0L; // size of all flowfiles removed in this session private final LongHolder bytesRead = new LongHolder(0L); private final LongHolder bytesWritten = new LongHolder(0L); @@ -169,7 +171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType, - context.getProvenanceRepository(), this); + context.getProvenanceRepository(), this); this.sessionId = idGenerator.getAndIncrement(); this.connectableDescription = description; @@ -196,7 +198,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // Processor-reported events. List<ProvenanceEventRecord> autoTerminatedEvents = null; - //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary + // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>(); for (final StandardRepositoryRecord record : records.values()) { if (record.isMarkedForDelete()) { @@ -235,11 +237,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } } else { - final Connection finalDestination = destinations.remove(destinations.size() - 1); //remove last element + final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element record.setDestination(finalDestination.getFlowFileQueue()); incrementConnectionInputCounts(finalDestination, record); - for (final Connection destination : destinations) { //iterate over remaining destinations and "clone" as needed + for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed incrementConnectionInputCounts(destination, record); final FlowFileRecord currRec = record.getCurrent(); final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); @@ -256,7 +258,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (claim != null) { context.getContentRepository().incrementClaimaintCount(claim); } - newRecord.setWorking(clone, Collections.<String, String>emptyMap()); + newRecord.setWorking(clone, Collections.<String, String> emptyMap()); newRecord.setDestination(destination.getFlowFileQueue()); newRecord.setTransferRelationship(record.getTransferRelationship()); @@ -322,9 +324,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final Connectable connectable = context.getConnectable(); final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; - LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); + LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife}); } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { - //records which have been updated - remove original if exists + // records which have been updated - remove original if exists removeContent(record.getOriginalClaim()); } } @@ -356,7 +358,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>(); for (final StandardRepositoryRecord record : checkpoint.records.values()) { if (record.isMarkedForAbort() || record.isMarkedForDelete()) { - continue; //these don't need to be transferred + continue; // these don't need to be transferred } // record.getCurrent() will return null if this record was created in this session -- // in this case, we just ignore it, and it will be cleaned up by clearing the records map. @@ -390,7 +392,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (LOG.isInfoEnabled()) { final String sessionSummary = summarizeEvents(checkpoint); if (!sessionSummary.isEmpty()) { - LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary}); + LOG.info("{} for {}, committed the following events: {}", new Object[] {this, connectableDescription, sessionSummary}); } } @@ -611,9 +613,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE boolean creationEventRegistered = false; if (registeredTypes != null) { if (registeredTypes.contains(ProvenanceEventType.CREATE) - || registeredTypes.contains(ProvenanceEventType.FORK) - || registeredTypes.contains(ProvenanceEventType.JOIN) - || registeredTypes.contains(ProvenanceEventType.RECEIVE)) { + || registeredTypes.contains(ProvenanceEventType.FORK) + || registeredTypes.contains(ProvenanceEventType.JOIN) + || registeredTypes.contains(ProvenanceEventType.RECEIVE)) { creationEventRegistered = true; } } @@ -747,7 +749,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } private StandardProvenanceEventRecord enrich( - final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) { + final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) { final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); if (eventFlowFile != null) { @@ -1039,7 +1041,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StringBuilder sb = new StringBuilder(512); if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD - || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { + || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { if (numCreated > 0) { sb.append("created ").append(numCreated).append(" FlowFiles, "); } @@ -1097,7 +1099,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private void formatNanos(final long nanos, final StringBuilder sb) { final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L; - long millis = nanos > 1000000L ? nanos / 1000000L : 0L;; + long millis = nanos > 1000000L ? nanos / 1000000L : 0L; + ; final long nanosLeft = nanos % 1000000L; if (seconds > 0) { @@ -1272,7 +1275,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE int flowFileCount = 0; long byteCount = 0L; for (final Connection conn : context.getPollableConnections()) { - final QueueSize queueSize = conn.getFlowFileQueue().getActiveQueueSize(); + final QueueSize queueSize = conn.getFlowFileQueue().size(); flowFileCount += queueSize.getObjectCount(); byteCount += queueSize.getByteCount(); } @@ -1287,8 +1290,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) - .addAttributes(attrs) - .build(); + .addAttributes(attrs) + .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, attrs); records.put(fFile, record); @@ -1324,7 +1327,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE context.getContentRepository().incrementClaimaintCount(claim); } final StandardRepositoryRecord record = new StandardRepositoryRecord(null); - record.setWorking(clone, Collections.<String, String>emptyMap()); + record.setWorking(clone, Collections.<String, String> emptyMap()); records.put(clone, record); if (offset == 0L && size == example.getSize()) { @@ -1637,7 +1640,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return; } - LOG.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, flowFiles.size()}); + LOG.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, flowFiles.size()}); final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size()); final String processorType; @@ -1650,7 +1653,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), - processorType, context.getProvenanceRepository(), this); + processorType, context.getProvenanceRepository(), this); final Map<String, FlowFileRecord> recordIdMap = new HashMap<>(); for (final FlowFileRecord flowFile : flowFiles) { @@ -1664,7 +1667,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; - LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); + LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife}); } try { @@ -1696,7 +1699,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE record.getContentClaimOffset() + claim.getOffset(), record.getSize()); } - enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap()); + enriched.setAttributes(record.getAttributes(), Collections.<String, String> emptyMap()); return enriched.build(); } @@ -1780,9 +1783,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); - final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); - final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository @@ -1853,7 +1856,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { try (final OutputStream rawOut = contentRepo.write(newClaim); - final OutputStream out = new BufferedOutputStream(rawOut)) { + final OutputStream out = new BufferedOutputStream(rawOut)) { if (header != null && header.length > 0) { out.write(header); @@ -2070,10 +2073,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // the original claim if the record is "working" but the content has not been modified // (e.g., in the case of attributes only were updated) // In other words: - // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will - // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because - // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do - // because we will do that later, in the session.commit() and that would result in removing the original claim twice. + // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will + // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because + // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do + // because we will do that later, in the session.commit() and that would result in removing the original claim twice. if (contentModified) { // In this case, it's ok to go ahead and destroy the content because we know that the working claim is going to be // updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim). @@ -2196,7 +2199,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) { validateRecordState(destination); - //TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true. + // TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true. if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) { // If we do NOT want to keep the file, ensure that we can delete it, or else error. throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import."); @@ -2228,9 +2231,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE removeTemporaryClaim(record); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) - .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) - .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) - .build(); + .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) + .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) + .build(); record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName()); if (!keepSourceFile) { deleteOnCommit.add(source); @@ -2370,7 +2373,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE * * @param flowFile the FlowFile to check * @return <code>true</code> if the FlowFile is known in this session, - * <code>false</code> otherwise. + * <code>false</code> otherwise. */ boolean isFlowFileKnown(final FlowFile flowFile) { return records.containsKey(flowFile); @@ -2392,8 +2395,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final String key = entry.getKey(); final String value = entry.getValue(); if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) - || CoreAttributes.DISCARD_REASON.key().equals(key) - || CoreAttributes.UUID.key().equals(key)) { + || CoreAttributes.DISCARD_REASON.key().equals(key) + || CoreAttributes.UUID.key().equals(key)) { continue; } newAttributes.put(key, value); @@ -2441,10 +2444,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) - .addAttributes(newAttributes) - .lineageIdentifiers(lineageIdentifiers) - .lineageStartDate(lineageStartDate) - .build(); + .addAttributes(newAttributes) + .lineageIdentifiers(lineageIdentifiers) + .lineageStartDate(lineageStartDate) + .build(); final StandardRepositoryRecord record = new StandardRepositoryRecord(null); record.setWorking(fFile, newAttributes); @@ -2465,7 +2468,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE */ private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) { final Map<String, String> result = new HashMap<>(); - //trivial cases + // trivial cases if (flowFileList == null || flowFileList.isEmpty()) { return result; } else if (flowFileList.size() == 1) { @@ -2478,8 +2481,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE */ final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes(); - outer: - for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { + outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { final String key = mapEntry.getKey(); final String value = mapEntry.getValue(); for (final FlowFile flowFile : flowFileList) { @@ -2539,7 +2541,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Set<String> removedFlowFiles = new HashSet<>(); private final Set<String> createdFlowFiles = new HashSet<>(); - private int removedCount = 0; // number of flowfiles removed in this session + private int removedCount = 0; // number of flowfiles removed in this session private long removedBytes = 0L; // size of all flowfiles removed in this session private long bytesRead = 0L; private long bytesWritten = 0L; http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java index c4d040b..3c4fcdb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java @@ -26,7 +26,7 @@ public class Connectables { public static boolean flowFilesQueued(final Connectable connectable) { for (final Connection conn : connectable.getIncomingConnections()) { - if (!conn.getFlowFileQueue().isActiveQueueEmpty()) { + if (!conn.getFlowFileQueue().isEmpty()) { return true; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 6eeddc5..f7191c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -22,16 +22,26 @@ import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.flowfile.FlowFile; import org.junit.Test; import org.mockito.Mockito; @@ -47,7 +57,7 @@ public class TestFileSystemSwapManager { final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); - final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, "/src/test/resources/old-swap-file.swap", new NopResourceClaimManager()); + final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager()); assertEquals(10000, records.size()); for (final FlowFileRecord record : records) { @@ -57,6 +67,53 @@ public class TestFileSystemSwapManager { } } + @Test + public void testRoundTripSerializeDeserialize() throws IOException { + final List<FlowFileRecord> toSwap = new ArrayList<>(10000); + final Map<String, String> attrs = new HashMap<>(); + for (int i = 0; i < 10000; i++) { + attrs.put("i", String.valueOf(i)); + final FlowFileRecord ff = new TestFlowFile(attrs, i); + toSwap.add(ff); + } + + final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); + + final String swapLocation = "target/testRoundTrip.swap"; + final File swapFile = new File(swapLocation); + Files.deleteIfExists(swapFile.toPath()); + + try (final FileOutputStream fos = new FileOutputStream(swapFile)) { + FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); + } + + final List<FlowFileRecord> swappedIn; + try (final FileInputStream fis = new FileInputStream(swapFile); + final DataInputStream dis = new DataInputStream(fis)) { + swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class)); + } + + assertEquals(toSwap.size(), swappedIn.size()); + for (int i = 0; i < toSwap.size(); i++) { + final FlowFileRecord pre = toSwap.get(i); + final FlowFileRecord post = swappedIn.get(i); + + assertEquals(pre.getSize(), post.getSize()); + assertEquals(pre.getAttributes(), post.getAttributes()); + assertEquals(pre.getSize(), post.getSize()); + assertEquals(pre.getId(), post.getId()); + assertEquals(pre.getContentClaim(), post.getContentClaim()); + assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset()); + assertEquals(pre.getEntryDate(), post.getEntryDate()); + assertEquals(pre.getLastQueueDate(), post.getLastQueueDate()); + assertEquals(pre.getLineageIdentifiers(), post.getLineageIdentifiers()); + assertEquals(pre.getLineageStartDate(), post.getLineageStartDate()); + assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis()); + } + } + + public class NopResourceClaimManager implements ResourceClaimManager { @Override @@ -100,4 +157,87 @@ public class TestFileSystemSwapManager { public void purge() { } } + + + private static class TestFlowFile implements FlowFileRecord { + private static final AtomicLong idGenerator = new AtomicLong(0L); + + private final long id = idGenerator.getAndIncrement(); + private final long entryDate = System.currentTimeMillis(); + private final long lastQueueDate = System.currentTimeMillis(); + private final Map<String, String> attributes; + private final long size; + + + public TestFlowFile(final Map<String, String> attributes, final long size) { + this.attributes = attributes; + this.size = size; + } + + + @Override + public long getId() { + return id; + } + + @Override + public long getEntryDate() { + return entryDate; + } + + @Override + public long getLineageStartDate() { + return entryDate; + } + + @Override + public Long getLastQueueDate() { + return lastQueueDate; + } + + @Override + public Set<String> getLineageIdentifiers() { + return Collections.emptySet(); + } + + @Override + public boolean isPenalized() { + return false; + } + + @Override + public String getAttribute(String key) { + return attributes.get(key); + } + + @Override + public long getSize() { + return size; + } + + @Override + public Map<String, String> getAttributes() { + return Collections.unmodifiableMap(attributes); + } + + @Override + public int compareTo(final FlowFile o) { + return Long.compare(id, o.getId()); + } + + @Override + public long getPenaltyExpirationMillis() { + return -1L; + } + + @Override + public ContentClaim getContentClaim() { + return null; + } + + @Override + public long getContentClaimOffset() { + return 0; + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 12f8e5e..1783708 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -134,7 +134,7 @@ public class TestStandardProcessSession { final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); - flowFileQueue = new StandardFlowFileQueue("1", connection, processScheduler, swapManager, null, 10000); + flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); Mockito.doAnswer(new Answer<Object>() { http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 8bf5553..0e3bcac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -37,6 +37,11 @@ import java.util.concurrent.TimeUnit; import javax.ws.rs.WebApplicationException; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.admin.service.UserService; +import org.apache.nifi.authorization.DownloadAuthorization; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.Connectable; @@ -47,9 +52,10 @@ import org.apache.nifi.controller.ContentAvailability; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.Counter; import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.FlowFileQueue; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.ContentNotFoundException; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -61,8 +67,8 @@ import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.QueueSize; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; @@ -75,7 +81,9 @@ import org.apache.nifi.provenance.search.SearchTerm; import org.apache.nifi.provenance.search.SearchTerms; import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.search.SearchContext; @@ -85,6 +93,7 @@ import org.apache.nifi.services.FlowService; import org.apache.nifi.user.NiFiUser; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.DocumentedTypeDTO; @@ -104,15 +113,6 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.DownloadableContent; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.ClassUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.admin.service.UserService; -import org.apache.nifi.authorization.DownloadAuthorization; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.reporting.BulletinQuery; -import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.web.security.user.NiFiUserUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -501,7 +501,7 @@ public class ControllerFacade { * Site-to-Site communications * * @return the socket port that the Cluster Manager is listening on for - * Site-to-Site communications + * Site-to-Site communications */ public Integer getClusterManagerRemoteSiteListeningPort() { return flowController.getClusterManagerRemoteSiteListeningPort(); @@ -512,7 +512,7 @@ public class ControllerFacade { * Manager are secure * * @return whether or not Site-to-Site communications with the Cluster - * Manager are secure + * Manager are secure */ public Boolean isClusterManagerRemoteSiteCommsSecure() { return flowController.isClusterManagerRemoteSiteCommsSecure(); @@ -523,7 +523,7 @@ public class ControllerFacade { * Site-to-Site communications * * @return the socket port that the local instance is listening on for - * Site-to-Site communications + * Site-to-Site communications */ public Integer getRemoteSiteListeningPort() { return flowController.getRemoteSiteListeningPort(); @@ -534,7 +534,7 @@ public class ControllerFacade { * instance are secure * * @return whether or not Site-to-Site communications with the local - * instance are secure + * instance are secure */ public Boolean isRemoteSiteCommsSecure() { return flowController.isRemoteSiteCommsSecure();
