exceptionfactory commented on code in PR #7122:
URL: https://github.com/apache/nifi/pull/7122#discussion_r1177940619


##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adx;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"})
+@CapabilityDescription("This Processor acts as a ADX source connector which 
queries data from Azure Data Explorer."+
+        "This connector can act only as a start of the data pipeline getting 
data from ADX."+
+        "The queries which can be used further details can be found here 
https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits";)
+@WritesAttributes({
+        @WritesAttribute(attribute = "ADX_QUERY_ERROR_MESSAGE", description = 
"Azure Data Explorer error message."),
+        @WritesAttribute(attribute = "ADX_EXECUTED_QUERY", description = 
"Azure Data Explorer executed query.")

Review Comment:
   FlowFile attribute names generally follow the convention of lowercase with 
`.` separators. Recommend the following:
   ```suggestion
           @WritesAttribute(attribute = "adx.query.error.message", description 
= "Azure Data Explorer error message."),
           @WritesAttribute(attribute = "adx.query.executed", description = 
"Azure Data Explorer executed query.")
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adx;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"})
+@CapabilityDescription("This Processor acts as a ADX source connector which 
queries data from Azure Data Explorer."+
+        "This connector can act only as a start of the data pipeline getting 
data from ADX."+
+        "The queries which can be used further details can be found here 
https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits";)
+@WritesAttributes({
+        @WritesAttribute(attribute = "ADX_QUERY_ERROR_MESSAGE", description = 
"Azure Data Explorer error message."),
+        @WritesAttribute(attribute = "ADX_EXECUTED_QUERY", description = 
"Azure Data Explorer executed query.")
+})
+public class QueryAzureDataExplorer extends AbstractProcessor {
+    public static final String ADX_QUERY_ERROR_MESSAGE = 
"adx.query.error.message";
+    public static final String ADX_EXECUTED_QUERY = "adx.executed.query";

Review Comment:
   These two values can be referenced in the `WritesAttribute` annotations.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adx;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"})
+@CapabilityDescription("This Processor acts as a ADX source connector which 
queries data from Azure Data Explorer."+
+        "This connector can act only as a start of the data pipeline getting 
data from ADX."+
+        "The queries which can be used further details can be found here 
https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits";)
+@WritesAttributes({
+        @WritesAttribute(attribute = "ADX_QUERY_ERROR_MESSAGE", description = 
"Azure Data Explorer error message."),
+        @WritesAttribute(attribute = "ADX_EXECUTED_QUERY", description = 
"Azure Data Explorer executed query.")
+})
+public class QueryAzureDataExplorer extends AbstractProcessor {
+    public static final String ADX_QUERY_ERROR_MESSAGE = 
"adx.query.error.message";
+    public static final String ADX_EXECUTED_QUERY = "adx.executed.query";
+    public static final String RELATIONSHIP_SUCCESS = "SUCCESS";
+
+    public static final String RELATIONSHIP_FAILED = "FAILED";
+    public static final String RELATIONSHIP_FAILED_DESC = "Relationship for 
failure";
+    public static final String RELATIONSHIP_SUCCESS_DESC = "Relationship for 
success";
+
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name(RELATIONSHIP_SUCCESS)
+            .description(RELATIONSHIP_SUCCESS_DESC)
+            .build();
+    public static final Relationship FAILED = new Relationship.Builder()
+            .name(RELATIONSHIP_FAILED)
+            .description(RELATIONSHIP_FAILED_DESC)
+            .build();
+    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor
+            .Builder().name(AzureAdxSourceProcessorParameter.DB_NAME.name())
+            
.displayName(AzureAdxSourceProcessorParameter.DB_NAME.getParamDisplayName())
+            
.description(AzureAdxSourceProcessorParameter.DB_NAME.getParamDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor ADX_QUERY = new PropertyDescriptor
+            .Builder().name(AzureAdxSourceProcessorParameter.ADX_QUERY.name())
+            
.displayName(AzureAdxSourceProcessorParameter.ADX_QUERY.getParamDisplayName())
+            
.description(AzureAdxSourceProcessorParameter.ADX_QUERY.getParamDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor ADX_SOURCE_SERVICE = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.name())
+            
.displayName(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.getParamDisplayName())
+            
.description(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.getParamDescription())
+            .required(true)
+            .identifiesControllerService(AdxSourceConnectionService.class)
+            .build();
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+    private Client executionClient;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.descriptors = List.of(ADX_SOURCE_SERVICE,DB_NAME,ADX_QUERY);
+        this.relationships = Set.of(SUCCESS,FAILED);

Review Comment:
   These collections can be declared statically and the `init` method can be 
removed.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml:
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-adx-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>

Review Comment:
   This dependency is not necessary and should be removed.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml:
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx</artifactId>

Review Comment:
   Following general project conventions, the module name should include the 
word `service` indicating that it contains the Controller Service 
implementation.
   ```suggestion
       <artifactId>nifi-adx-service</artifactId>
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+
+import java.util.List;
+import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})

Review Comment:
   Controller Services do not read FlowFile attributes, so these annotations 
should be removed.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+
+import java.util.List;
+import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");
+        adxConnectionParams = new ADXConnectionParams();
+        
adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue());
+        if (this.executionClient != null) {
+            onStopped();
+        }
+
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if(this.executionClient!=null){

Review Comment:
   ```suggestion
           if (this.executionClient != null) {
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/NiFiVersion.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NiFiVersion {
+    public static final String CLIENT_NAME = "Kusto.Nifi";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");

Review Comment:
   Is this value necessary?



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/test/java/org/apache/nifi/adx/MockAzureAdxSourceProcessor.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MockAzureAdxSourceProcessor extends AbstractProcessor {

Review Comment:
   This class can be removed in favor of a mock created using Mockito.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/enums/AzureAdxSourceProcessorParameter.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adx.enums;
+
+public enum AzureAdxSourceProcessorParameter {
+    DB_NAME("Database name", "The name of the database where the query will be 
executed."),

Review Comment:
   Display Names should follow the "Title Case" convention, capitalizing most 
of the words.
   ```suggestion
       DB_NAME("Database Name", "The name of the database where the query will 
be executed."),
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adx;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"})
+@CapabilityDescription("This Processor acts as a ADX source connector which 
queries data from Azure Data Explorer."+
+        "This connector can act only as a start of the data pipeline getting 
data from ADX."+
+        "The queries which can be used further details can be found here 
https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits";)
+@WritesAttributes({
+        @WritesAttribute(attribute = "ADX_QUERY_ERROR_MESSAGE", description = 
"Azure Data Explorer error message."),
+        @WritesAttribute(attribute = "ADX_EXECUTED_QUERY", description = 
"Azure Data Explorer executed query.")
+})
+public class QueryAzureDataExplorer extends AbstractProcessor {
+    public static final String ADX_QUERY_ERROR_MESSAGE = 
"adx.query.error.message";
+    public static final String ADX_EXECUTED_QUERY = "adx.executed.query";
+    public static final String RELATIONSHIP_SUCCESS = "SUCCESS";
+
+    public static final String RELATIONSHIP_FAILED = "FAILED";
+    public static final String RELATIONSHIP_FAILED_DESC = "Relationship for 
failure";
+    public static final String RELATIONSHIP_SUCCESS_DESC = "Relationship for 
success";

Review Comment:
   These descriptions should not be public static variables, instead they can 
be specified directly in the Relationship Builder.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/enums/AzureAdxSourceProcessorParameter.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adx.enums;
+
+public enum AzureAdxSourceProcessorParameter {
+    DB_NAME("Database name", "The name of the database where the query will be 
executed."),
+
+    ADX_QUERY("ADX query", "The query which needs to be executed in Azure Data 
Explorer."),

Review Comment:
   The `ADX` prefix on the property name is not necessary.
   ```suggestion
       ADX_QUERY("Query", "The query which needs to be executed in Azure Data 
Explorer."),
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml:
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-adx-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.azure.kusto</groupId>
+            <artifactId>kusto-ingest</artifactId>
+            <version>${azure-kusto-java-sdk-version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   Compile dependencies should be listed before test dependencies.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+
+import java.util.List;
+import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";

Review Comment:
   These static string values should be replaced with an `enum` that defines 
the allowed values. The `enum` should implement the `DescribedValue` interface 
to provide description information.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;

Review Comment:
   ```suggestion
   package org.apache.nifi.adx.service;
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+
+import java.util.List;
+import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");
+        adxConnectionParams = new ADXConnectionParams();
+        
adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue());

Review Comment:
   Sensitive properties should not support attribute evaluation.
   ```suggestion
           
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).getValue());
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+
+import java.util.List;
+import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)

Review Comment:
   This property should be marked as sensitive.
   ```suggestion
               .required(true)
               .sensitive(true)
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-nar/pom.xml:
##########
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx-nar</artifactId>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-adx-processors</artifactId>
+            <version>${nifi-release-version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <!-- test data -->
+                        <exclude>src/test/resources/*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemPropertyVariables>
+                        
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+                    </systemPropertyVariables>
+                </configuration>
+            </plugin>
+        </plugins>

Review Comment:
   Is there a specific reason for changing the temporary directory? This seems 
unnecessary.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/src/main/java/org/apache/nifi/adx/AdxSourceConnectionService.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+import com.microsoft.azure.kusto.data.Client;

Review Comment:
   This `Client` references places a hard dependency on a specific version of 
the Kusto library. It looks like the use of the Client in the Processor is 
limited to return results. Instead of having the interface return the `Client`, 
it looks like this service interface could be abstract with a `query()` method 
that returns Data in the form of `List<List<Object>>`. What do you think of 
that approach? That would allow removing the dependency on the Kusto library 
from the API module, and make the service more flexible should it be necessary 
to upgrade the Kusto library for the Controller Service implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to