Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2158#discussion_r140448400
--- Diff:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.DirectoryEntry;
+import com.microsoft.azure.datalake.store.DirectoryEntryType;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.adls.ADLSConstants.ACCOUNT_NAME;
+import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS;
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"hadoop", "ADLS", "get", "list", "ingest", "ingress", "source",
"filesystem"})
+@CapabilityDescription("Retrieves a listing of files from ADLS. For each
file that is "
+ + "listed in ADLS, this processor creates a FlowFile that
represents the ADLS file to be fetched in conjunction with FetchADLSFile. This
Processor is "
+ + "designed to run on Primary Node only in a cluster. If the
primary node changes, the new Primary Node will pick up where the previous node
left "
+ + "off without duplicating all of the data.")
+@WritesAttributes({
+ @WritesAttribute(attribute="filename", description="The name of
the file that was read from ADLS"),
+ @WritesAttribute(attribute="path", description="The path is set to
the path of the file on ADLS"),
+ @WritesAttribute(attribute="adls.owner", description="The user
that owns the file"),
+ @WritesAttribute(attribute="adls.group", description="The group
that owns the file"),
+ @WritesAttribute(attribute="adls.lastModified", description="The
timestamp of when the file was last modified, as milliseconds since midnight
Jan 1, 1970 UTC"),
+ @WritesAttribute(attribute="adls.length", description="The number
of bytes in the file"),
+ @WritesAttribute(attribute="adls.expirytime", description="The
number of bytes in the file"),
+ @WritesAttribute(attribute="adls.permissions", description="The
permissions for the file in ADLS. This is formatted as 3 characters for the
owner, "
+ + "3 for the group, and 3 for other users. For example
rw-rw-r--")
+})
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a
listing of ADLS files, the latest timestamp of all the files transferred is
stored. "
+ + "This allows the Processor to list only files that have been
added or modified after "
+ + "this date the next time that the Processor is run, without
having to store all of the actual filenames/paths which could lead to
performance "
+ + "problems. State is stored across the cluster so that this
Processor can be run on Primary Node only and if a new Primary "
+ + "Node is selected, the new node can pick up where the previous
node left off, without duplicating the data.")
+public class ListADLSFile extends ADLSAbstractProcessor {
+
+ public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new
PropertyDescriptor.Builder()
+ .name("Distributed Cache Service")
+ .description("Specifies the Controller Service that should be
used to maintain state about what has been pulled from ADLS so that if a new
node "
+ + "begins pulling data, it won't duplicate all of the
work that has been done.")
+ .required(false)
+ .identifiesControllerService(DistributedMapCacheClient.class)
+ .build();
+
+ public static final PropertyDescriptor RECURSE_SUBDIRS = new
PropertyDescriptor.Builder()
+ .name("Recurse Subdirectories")
+ .description("Indicates whether to list files from
subdirectories of the ADLS directory")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor FILE_FILTER = new
PropertyDescriptor.Builder()
+ .name("File Filter")
+ .description("Only files whose names match the given regular
expression will be picked up")
+ .required(true)
+ .defaultValue("[^\\.].*")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DIRECTORY = new
PropertyDescriptor.Builder()
+ .name("Directory")
+ .description("The ADLS directory from which files should be
listed")
+ .required(true)
+
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ // increase this factor from test case to test minimum duration
between runs functionality
+ protected long testingMinimumDurationMultiplicationFactor = 1;
+
+ private volatile long latestTimestampListed = 0;
+ private volatile long lastRunTimestamp = 0;
+
+ protected static final String LISTING_TIMESTAMP_KEY =
"adls.listing.timestamp";
+
+ protected static final long MINIMUM_DURATON_BW_RUNS_NANOS =
TimeUnit.MILLISECONDS.toNanos(100L);
+
+ @Override
+ protected void init(ProcessorInitializationContext context) {
+ super.init(context);
+
+ super.descriptors.add(DISTRIBUTED_CACHE_SERVICE);
+ super.descriptors.add(RECURSE_SUBDIRS);
+ super.descriptors.add(FILE_FILTER);
+ super.descriptors.add(DIRECTORY);
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor,
final String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+ if (isConfigurationRestored()
+ && (descriptor.equals(DIRECTORY)
+ || descriptor.equals(FILE_FILTER)
+ || descriptor.equals(ACCOUNT_NAME))) {
+ latestTimestampListed = 0;
+ }
+ }
+
+ @Override
+ public void onTrigger(ProcessContext processContext, ProcessSession
processSession) throws ProcessException {
+ final long now = System.nanoTime();
+ if (now - lastRunTimestamp < MINIMUM_DURATON_BW_RUNS_NANOS *
testingMinimumDurationMultiplicationFactor) {
+ processContext.yield();
+ return;
+ }
+ lastRunTimestamp = now;
+
+ final String fileFilter =
processContext.getProperty(FILE_FILTER).getValue();
+ final String directoryValue =
processContext.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
+ final String directory =
Paths.get(directoryValue).normalize().toString();
+ final Date latestTimestampListedDate;
+ try {
+ latestTimestampListed =
getLatestTimestampListedFromState(processContext);
+ latestTimestampListedDate = new Date(latestTimestampListed);
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve timestamp of last
listing from Distributed Cache Service. Will not perform listing until this is
accomplished.");
+ processContext.yield();
+ return;
+ }
+
+ ADLStoreClient adlStoreClient = getAdlStoreClient();
+ if(adlStoreClient == null) {
+ createADLSClient(processContext);
+ adlStoreClient = getAdlStoreClient();
+ }
+ final boolean recursive =
processContext.getProperty(RECURSE_SUBDIRS).asBoolean();
+
+ List<DirectoryEntry> directoryEntries = null;
+ try {
+ if(directory == null || directory.isEmpty())
+ throw new IOException("The ADLS directory from which files
should be listed is empty");
+ directoryEntries = getDirectoryListing(adlStoreClient,
directory, recursive);
+ } catch (IOException e) {
+ getLogger().error("Failed to perform listing of ADLS due to
{}", new Object[] {e});
+ return;
+ }
+
+ if(directoryEntries.size() == 0) {
+ getLogger().debug("There is no data to list. Yielding.");
+ processContext.yield();
+ return;
+ }
+
+ Optional<DirectoryEntry> optionalDirectoryEntry = directoryEntries
+ .stream()
+ .filter(directoryEntry ->
+ directoryEntry.type.equals(DirectoryEntryType.FILE)
+ )
+ .filter(directoryEntry ->
+
directoryEntry.lastModifiedTime.after(latestTimestampListedDate)
+ )
+ .filter(directoryEntry ->
+ directoryEntry.name.matches(fileFilter)
+ )
+ .map(directoryEntry -> {
+ createAndTransferFlowFile(processSession,
directoryEntry);
+ return directoryEntry;
+ })
+ .max(Comparator.comparing(de -> de.lastModifiedTime));
+
+ Date currentRunLatestModifiedTimestamp;
+ try {
+ currentRunLatestModifiedTimestamp =
optionalDirectoryEntry.get().lastModifiedTime;
+ } catch (NoSuchElementException e) {
+ getLogger().debug("There is no data to list. Yielding.");
+ processContext.yield();
+ return;
+ }
+
+
if(currentRunLatestModifiedTimestamp.after(latestTimestampListedDate)) {
+ processSession.commit();
+ getLogger().debug("Session commited");
+ try {
+ saveState(processContext,
currentRunLatestModifiedTimestamp);
+ } catch (IOException e) {
+ getLogger().warn("Failed to save cluster-wide state. If
NiFi is restarted, data duplication may occur", e);
+ }
+ }
+ }
+
+ private void saveState(ProcessContext processContext, final Date
listedTimestamp) throws IOException {
+ final Map<String, String> updatedState = new HashMap<>(1);
+ updatedState.put(LISTING_TIMESTAMP_KEY,
String.valueOf(listedTimestamp.getTime()));
+ processContext.getStateManager().setState(updatedState,
Scope.CLUSTER);
+ getLogger().debug("New state saved with listed time: {}", new
Object[] {listedTimestamp.toString()});
+ }
+
+ private void createAndTransferFlowFile(ProcessSession processSession,
DirectoryEntry directoryEntry) {
+ final Map<String, String> attributes =
createAttributes(directoryEntry);
+ FlowFile flowFile = processSession.create();
+ flowFile = processSession.putAllAttributes(flowFile, attributes);
+ processSession.transfer(flowFile, REL_SUCCESS);
+ }
+
+ private Map<String, String> createAttributes(final DirectoryEntry
directoryEntry) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), directoryEntry.name);
+ Path fullNamePath = Paths.get(directoryEntry.fullName);
+ fullNamePath.toAbsolutePath().getParent().toString();
+ attributes.put(CoreAttributes.PATH.key(),
fullNamePath.getParent().toString());
+ attributes.put(CoreAttributes.ABSOLUTE_PATH.key(),
fullNamePath.toAbsolutePath().getParent().toString());
+ attributes.put("adls.owner", directoryEntry.user);
+ attributes.put("adls.group", directoryEntry.group);
+ attributes.put("adls.lastModified",
String.valueOf(directoryEntry.lastModifiedTime));
+ attributes.put("adls.length",
String.valueOf(directoryEntry.length));
+ attributes.put("adls.permissions", directoryEntry.permission);
+ if(directoryEntry.expiryTime != null)
--- End diff --
You need brackets. Please confirm the build is passing with
``-Pcontrib-check`` profile. Right now, you have the following checkstyle
issues:
````
[WARNING]
src/main/java/org/apache/nifi/processors/adls/ADLSAbstractProcessor.java[35]
(imports) AvoidStarImport: Using the '.*' form of import should be avoided -
org.apache.nifi.processors.adls.ADLSConstants.*.
[WARNING]
src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java[22] (imports)
AvoidStarImport: Using the '.*' form of import should be avoided -
org.apache.nifi.annotation.behavior.*.
[WARNING]
src/main/java/org/apache/nifi/processors/adls/ListADLSFile.java[40] (imports)
AvoidStarImport: Using the '.*' form of import should be avoided - java.util.*.
[WARNING]
src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java[23] (imports)
AvoidStarImport: Using the '.*' form of import should be avoided -
org.apache.nifi.annotation.behavior.*.
[WARNING]
src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java[48] (imports)
AvoidStarImport: Using the '.*' form of import should be avoided -
org.apache.nifi.processors.adls.ADLSConstants.*.
[WARNING]
src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java[223:9] (blocks)
RightCurly: '}' should be on the same line.
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestFetchADLSFile.java[27:8]
(imports) UnusedImports: Unused import - org.apache.nifi.util.MockFlowFile.
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestFetchADLSFile.java[34:8]
(imports) UnusedImports: Unused import - java.io.File.
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestFetchADLSFile.java[35:8]
(imports) UnusedImports: Unused import - java.io.FileInputStream.
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestFetchADLSFile.java[49]
(sizes) LineLength: Line is longer than 200 characters (found 342).
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestFetchADLSFile.java[53]
(sizes) LineLength: Line is longer than 200 characters (found 337).
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestFetchADLSFile.java[54]
(sizes) LineLength: Line is longer than 200 characters (found 333).
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestListADLSFile.java[57] (sizes)
LineLength: Line is longer than 200 characters (found 910).
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestListADLSFile.java[58] (sizes)
LineLength: Line is longer than 200 characters (found 1,934).
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestListADLSFile.java[59] (sizes)
LineLength: Line is longer than 200 characters (found 936).
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestListADLSFile.java[60] (sizes)
LineLength: Line is longer than 200 characters (found 1,569).
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestListADLSFile.java[61] (sizes)
LineLength: Line is longer than 200 characters (found 922).
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestPutADLSFile.java[34:8]
(imports) UnusedImports: Unused import - java.io.ByteArrayInputStream.
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestPutADLSFile.java[36:8]
(imports) UnusedImports: Unused import - java.io.InputStream.
[WARNING]
src/test/java/org/apache/nifi/processors/adls/TestPutADLSFile.java[37:8]
(imports) UnusedImports: Unused import - java.nio.file.Paths.
````
---