Github user jzonthemtn commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2158#discussion_r140665864
--- Diff:
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/adls/PutADLSFile.java
---
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adls;
+
+import com.microsoft.azure.datalake.store.ADLFileOutputStream;
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.IfExists;
+import com.microsoft.azure.datalake.store.acl.AclEntry;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.adls.ADLSConstants.ERR_ACL_ENTRY;
+import static
org.apache.nifi.processors.adls.ADLSConstants.ERR_FLOWFILE_CORE_ATTR_FILENAME;
+import static org.apache.nifi.processors.adls.ADLSConstants.REL_FAILURE;
+import static
org.apache.nifi.processors.adls.ADLSConstants.FILE_NAME_ATTRIBUTE;
+import static
org.apache.nifi.processors.adls.ADLSConstants.CHUNK_SIZE_IN_BYTES;
+import static
org.apache.nifi.processors.adls.ADLSConstants.ADLS_FILE_PATH_ATTRIBUTE;
+import static org.apache.nifi.processors.adls.ADLSConstants.REL_SUCCESS;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"azure", "hadoop", "ADLS", "put", "egress", "copy", "filesystem",
"restricted"})
+@CapabilityDescription("Write FlowFile data to Azure Data Lake Store
(ADLS)")
+@SeeAlso({ListADLSFile.class, FetchADLSFile.class})
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of
the file written to ADLS comes from the value of this attribute."),
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "The name
of the file written to ADLS is stored in this attribute."),
+ @WritesAttribute(attribute = "absolute.adls.path", description =
"The absolute path to the file on ADLS is stored in this attribute.")
+})
+@Restricted("Provides operator the ability to write to any file that NiFi
has access to in ADLS.")
+public class PutADLSFile extends ADLSAbstractProcessor {
+
+ private static final String PART_FILE_EXTENSION = ".nifipart";
+ private static final String REPLACE_RESOLUTION = "replace";
+ private static final String FAIL_RESOLUTION = "fail";
+ private static final String APPEND_RESOLUTION = "append";
+
+ protected static final AllowableValue REPLACE_RESOLUTION_AV = new
AllowableValue(REPLACE_RESOLUTION,
+ REPLACE_RESOLUTION, "Replaces the existing file if any.");
+ protected static final AllowableValue FAIL_RESOLUTION_AV = new
AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+ "Penalizes the flow file and routes it to failure.");
+ protected static final AllowableValue APPEND_RESOLUTION_AV = new
AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
+ "Appends to the existing file if any, creates a new file
otherwise.");
+
+ // properties
+ protected static final PropertyDescriptor CONFLICT_RESOLUTION = new
PropertyDescriptor.Builder()
+ .name("adls-put-conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file " +
+ "with the same name already exists in the output
directory")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION_AV.getValue())
+ .allowableValues(REPLACE_RESOLUTION_AV, FAIL_RESOLUTION_AV,
APPEND_RESOLUTION_AV)
+ .build();
+
+ protected static final PropertyDescriptor DIRECTORY = new
PropertyDescriptor.Builder()
+ .name("adls-put-directory")
+ .displayName("Directory")
+ .description("The directory where files will be written")
+ .required(true)
+
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ protected static final PropertyDescriptor UMASK = new
PropertyDescriptor.Builder()
+ .name("adls-put-permissions-umask")
+ .displayName("Permissions umask")
+ .description(
+ "A umask represented as an octal number which
determines the permissions of files written to ADLS.")
+ .addValidator(ADLSValidators.UMASK_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor ACL = new
PropertyDescriptor.Builder()
+ .name("adls-put-access-control-list")
+ .displayName("Access Control List")
+ .description(
+ "Comma separated list of ACL entry. An ACL entry
consists of a scope (access or default)" +
+ " the type of the ACL (user, group, other or
mask), the name of the user or group" +
+ " associated with this ACL (can be blank to
specify the default permissions for" +
+ " users and groups, and must be blank for mask
entries), and the action permitted" +
+ " by this ACL entry. eg:
\"default:user:bob:r-x,default:user:adam:rw\"")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ @Override
+ protected void init(ProcessorInitializationContext context) {
+ super.init(context);
+
+ super.descriptors.add(CONFLICT_RESOLUTION);
+ super.descriptors.add(DIRECTORY);
+ super.descriptors.add(UMASK);
+ super.descriptors.add(ACL);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final StopWatch dataRateStopWatch = new StopWatch();
+ final StopWatch completeProcessStopWatch = new StopWatch(true);
+
+ ADLStoreClient adlsClient = getAdlStoreClient();
+ //safe check
--- End diff --
This check for `null` is duplicated in a few places. Is it possible to move
the check to `getAdlStoreClient()`, and if it is going to return `null`, then
create the client (`createADLSClient()`) in order to get rid of the checks?
---