This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2cfef542e65a35249cdcd66d4d88392a8e8b1633 Author: Nathan Gough <[email protected]> AuthorDate: Mon Mar 21 10:32:16 2022 -0400 NIFI-9701 - Corrected No Tracking strategy to create one flow file when using a Record Writer - Corrected No Tracking strategy Record Writer handling for ListSFTP - Updated temporary test files to have last modified time of epoch to avoid intermittent issue with Minimum Age filtering - Refactored MockCacheService to separate reusable class This closes #5885 Signed-off-by: David Handermann <[email protected]> --- .../processor/util/list/AbstractListProcessor.java | 20 ++-- .../nifi/processors/standard/MockCacheService.java | 74 +++++++++++++ .../processors/standard/TestDeduplicateRecord.java | 51 --------- .../nifi/processors/standard/TestListSFTP.java | 118 +++++++++++++++++---- 4 files changed, 183 insertions(+), 80 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java index 094ba8bf24..17e14a47dc 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -576,17 +576,17 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab entitiesForTimestamp.add(entity); } - if (orderedEntries.size() > 0) { - for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) { - List<T> entities = timestampEntities.getValue(); - for (T entity : entities) { - // Create the FlowFile for this path. - final Map<String, String> attributes = createAttributes(entity, context); - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); - } + final boolean writerSet = context.getProperty(RECORD_WRITER).isSet(); + if (writerSet) { + try { + createRecordsForEntities(context, session, orderedEntries); + } catch (final IOException | SchemaNotFoundException e) { + getLogger().error("Failed to write listing to FlowFile", e); + context.yield(); + return; } + } else { + createFlowFilesForEntities(context, session, orderedEntries); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java new file mode 100644 index 0000000000..c2f231efe5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient { + private Map storage; + + public MockCacheService() { + storage = new HashMap<>(); + } + + @Override + public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + return false; + } + + @Override + public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException { + return null; + } + + @Override + public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException { + return storage.containsKey(key); + } + + @Override + public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + storage.put(key, value); + } + + @Override + public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { + return null; + } + + @Override + public void close() throws IOException { + + } + + @Override + public <K> boolean remove(K key, Serializer<K> serializer) throws IOException { + return false; + } + + @Override + public long removeByPattern(String regex) throws IOException { + return 0; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java index 9de152feeb..9f90c3186c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java @@ -17,10 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordWriter; @@ -31,7 +28,6 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -271,51 +267,4 @@ public class TestDeduplicateRecord { } } - private static final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient { - private Map storage; - - public MockCacheService() { - storage = new HashMap<>(); - } - - @Override - public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { - return false; - } - - @Override - public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException { - return null; - } - - @Override - public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException { - return storage.containsKey(key); - } - - @Override - public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { - storage.put(key, value); - } - - @Override - public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { - return null; - } - - @Override - public void close() throws IOException { - - } - - @Override - public <K> boolean remove(K key, Serializer<K> serializer) throws IOException { - return false; - } - - @Override - public long removeByPattern(String regex) throws IOException { - return 0; - } - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java index 18ec6375dd..9cae5f905b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java @@ -21,17 +21,24 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; -import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SSHTestServer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -50,15 +57,14 @@ public class TestListSFTP { private SSHTestServer sshServer; - private String tempFileName; + private List<File> testFileNames; @Before public void setUp() throws Exception { sshServer = new SSHTestServer(); sshServer.startServer(); - - writeTempFile(); - + testFileNames = new ArrayList<File>(); + writeTempFile(3); runner = TestRunners.newTestRunner(ListSFTP.class); runner.setProperty(ListSFTP.HOSTNAME, sshServer.getHost()); runner.setProperty(ListSFTP.USERNAME, sshServer.getUsername()); @@ -66,7 +72,6 @@ public class TestListSFTP { runner.setProperty(FTPTransfer.PORT, Integer.toString(sshServer.getSSHPort())); runner.setProperty(ListSFTP.REMOTE_PATH, REMOTE_DIRECTORY); runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS); - runner.assertValid(); assertVerificationSuccess(); } @@ -74,13 +79,13 @@ public class TestListSFTP { @After public void tearDown() throws Exception { sshServer.stopServer(); + Files.deleteIfExists(Paths.get(sshServer.getVirtualFileSystemPath())); } @Test - public void testRunFileFound() { - runner.run(); - - runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); + public void testRunFileFound() throws InterruptedException { + runner.run(1); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3); runner.assertAllFlowFilesContainAttribute("sftp.remote.host"); runner.assertAllFlowFilesContainAttribute("sftp.remote.port"); runner.assertAllFlowFilesContainAttribute("sftp.listing.user"); @@ -93,14 +98,81 @@ public class TestListSFTP { final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername()); - retrievedFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), tempFileName); + } + + @Test + public void testRunWithRecordWriter() throws InitializationException, InterruptedException { + RecordSetWriterFactory recordWriter = getCsvRecordWriter(); + runner.addControllerService("csv-record-writer", recordWriter); + runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer"); + runner.enableControllerService(recordWriter); + runner.assertValid(recordWriter); + runner.run(2); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); + } + + @Test + public void testRunWithRecordWriterNoTracking() throws InitializationException, InterruptedException { + RecordSetWriterFactory recordWriter = getCsvRecordWriter(); + runner.addControllerService("csv-record-writer", recordWriter); + runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer"); + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING); + runner.enableControllerService(recordWriter); + runner.assertValid(recordWriter); + runner.run(2); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 2); + } + + @Test + public void testRunWithRecordWriterByTimestamps() throws InitializationException, InterruptedException { + RecordSetWriterFactory recordWriter = getCsvRecordWriter(); + runner.addControllerService("csv-record-writer", recordWriter); + runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer"); + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS); + runner.enableControllerService(recordWriter); + runner.assertValid(recordWriter); + runner.run(2); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); + } + + @Test + public void testRunWithRecordWriterByEntities() throws InitializationException, InterruptedException { + RecordSetWriterFactory recordWriter = getCsvRecordWriter(); + runner.addControllerService("csv-record-writer", recordWriter); + runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer"); + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES); + runner.enableControllerService(recordWriter); + DistributedMapCacheClient dmc = new MockCacheService<>(); + runner.addControllerService("dmc", dmc); + runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "dmc"); + runner.enableControllerService(dmc); + runner.assertValid(dmc); + runner.assertValid(recordWriter); + runner.run(2); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); + } + + @Test + public void testFilesWithRestart() throws InitializationException, InterruptedException { + RecordSetWriterFactory recordWriter = getCsvRecordWriter(); + runner.addControllerService("csv-record-writer", recordWriter); + runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer"); + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES); + runner.enableControllerService(recordWriter); + DistributedMapCacheClient dmc = new MockCacheService<>(); + runner.addControllerService("dmc", dmc); + runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "dmc"); + runner.enableControllerService(dmc); + runner.assertValid(); + runner.run(2); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); } @Test public void testRunFileNotFoundMinSizeFiltered() { runner.setProperty(ListFile.MIN_SIZE, "1KB"); - runner.run(); + runner.run(2); runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0); } @@ -113,13 +185,21 @@ public class TestListSFTP { assertEquals(Outcome.SUCCESSFUL, result.getOutcome()); } - private void writeTempFile() { - final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID())); - try { - Files.write(file.toPath(), FILE_CONTENTS); - tempFileName = file.getName(); - } catch (final IOException e) { - throw new UncheckedIOException(e); + private void writeTempFile(final int count) { + for (int i = 0; i < count; i++) { + final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID())); + try { + Files.write(file.toPath(), FILE_CONTENTS); + file.setLastModified(0); + testFileNames.add(file); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } } + assert(new File(sshServer.getVirtualFileSystemPath()).listFiles().length == count); + } + + private RecordSetWriterFactory getCsvRecordWriter() { + return new MockRecordWriter("name, age"); } }
