exceptionfactory commented on code in PR #7122:
URL: https://github.com/apache/nifi/pull/7122#discussion_r1203243558


##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/pom.xml:
##########
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <!-- test data -->
+                        <exclude>src/test/resources/*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>

Review Comment:
   This block is not necessary since the `nifi-adx-api` module does not contain 
any test sources.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-nar/pom.xml:
##########
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-adx-processors</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <!-- test data -->
+                        <exclude>src/test/resources/*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>

Review Comment:
   This block is not necessary because `nifi-adx-nar` does not contain any 
tests or test resources.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/src/main/java/org/apache/nifi/adx/AdxSourceConnectionService.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+@Tags({"azure", "adx"})
+@CapabilityDescription("Connection-Service to Azure ADX (Kusto) cluster.")
+public interface AdxSourceConnectionService extends ControllerService {

Review Comment:
   Recommend renaming this service interface.
   ```suggestion
   public interface KustoQueryService extends ControllerService {
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml:
##########
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-adx-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.azure.kusto</groupId>
+            <artifactId>kusto-ingest</artifactId>
+            <version>${azure-kusto-java-sdk-version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>

Review Comment:
   The license plugin execution cannot be skipped as part of standard 
execution, this plugin configuration should be removed.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxConnectionServiceParameter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+public enum AzureAdxConnectionServiceParameter {
+
+    AUTH_STRATEGY("Kusto Authentication Method", "The strategy/method to 
authenticate against Azure Active Directory, either 'application' or 
'managed_identity'."),

Review Comment:
   ```suggestion
       AUTHENTICATION_METHOD("Kusto Authentication Method", "The strategy or 
method to authenticate against Azure Data Explorer"),
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");

Review Comment:
   This log message duplicates standard framework logging and is not necessary.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;

Review Comment:
   ```suggestion
       private Client kustoClient;
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");
+        adxConnectionParams = new ADXConnectionParams();
+        
adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue());
+        if (this.executionClient != null) {
+            onStopped();
+        }
+
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if(this.executionClient!=null){
+            try {
+                this.executionClient.close();
+            } catch (Exception e) {
+                getLogger().error("Error closing Kusto Execution Client", e);
+            }
+        }
+        this.executionClient = null;
+    }
+
+
+    public Client getKustoQueryClient() {
+        return 
createKustoExecutionClient(adxConnectionParams.getKustoEngineURL(),
+                adxConnectionParams.getAppId(),
+                adxConnectionParams.getAppKey(),
+                adxConnectionParams.getAppTenant(),
+                adxConnectionParams.getKustoAuthStrategy());
+    }
+
+    @Override
+    public KustoQueryResponse executeQuery(String databaseName, String query){
+        if (this.executionClient == null) {
+            this.executionClient = getKustoQueryClient();
+        }

Review Comment:
   This approach to initialization is not thread-safe. It looks like creating 
the Kusto Client could be moved to `onEnabled()`.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-nar/pom.xml:
##########
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-adx-processors</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <!-- test data -->
+                        <exclude>src/test/resources/*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>

Review Comment:
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/resources/azure-kusto-nifi-version.properties:
##########
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+version=${project.version}

Review Comment:
   This file can be removed



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/test/java/org/apache/nifi/adx/TestAzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import com.microsoft.azure.kusto.data.Client;
+import org.apache.nifi.adx.service.AzureAdxSourceConnectionService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.ControllerServiceConfiguration;
+import org.apache.nifi.util.StandardProcessorTestRunner;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestAzureAdxSourceConnectionService {
+
+    private TestRunner runner;
+
+    private AzureAdxSourceConnectionService service;
+
+    private static final String MOCK_APP_ID = "mockAppId";
+
+    private static final String MOCK_APP_KEY = "mockAppKey";
+
+    private static final String MOCK_APP_TENANT = "mockAppTenant";
+
+    private static final String MOCK_CLUSTER_URL = 
"https://mockClusterUrl.com/";;
+
+    @BeforeEach
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(new AbstractProcessor() {
+            @Override
+            public void onTrigger(ProcessContext context, ProcessSession 
session) throws ProcessException {
+            }
+
+            @Override
+            protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
+                List<PropertyDescriptor> propDescs = new ArrayList<>();
+                propDescs.add(new PropertyDescriptor.Builder()
+                        .name("AdxService")
+                        .description("AdxService")
+                        
.identifiesControllerService(AdxSourceConnectionService.class)
+                        .required(true)
+                        .build());
+                return propDescs;
+            }
+        });
+
+        service = new AzureAdxSourceConnectionService();
+        runner.addControllerService("test-good", service);
+    }
+
+    @AfterEach
+    public void after() {
+        runner.clearProperties();
+    }
+
+    /**
+     * test successful adx connection scenario where all valid parameters are 
passed
+     */
+    @Test
+    void testAdxConnectionController() {
+        configureAppId();
+        configureAppKey();
+        configureAppTenant();
+        configureClusterURL();
+
+        runner.assertValid(service);
+    }
+
+    /**
+     * test successful adx connection scenario where all valid parameters are 
passed
+     */
+    @Test
+    void testCreateExecutionClientSuccess() {
+        configureAppId();
+        configureAppKey();
+        configureAppTenant();
+        configureClusterURL();
+        runner.assertValid(service);
+        runner.setValidateExpressionUsage(false);
+        runner.enableControllerService(service);
+        Client executionClient = service.getKustoQueryClient();
+
+        Assertions.assertNotNull(executionClient);
+    }
+
+    @Test
+    void testPropertyDescriptor() {
+        configureAppId();
+        configureAppKey();
+        configureAppTenant();
+        configureClusterURL();
+        List<PropertyDescriptor> pd = 
service.getSupportedPropertyDescriptors();
+
+        assertTrue(pd.contains(AzureAdxSourceConnectionService.APP_ID));
+        assertTrue(pd.contains(AzureAdxSourceConnectionService.APP_KEY));
+        assertTrue(pd.contains(AzureAdxSourceConnectionService.APP_TENANT));
+        assertTrue(pd.contains(AzureAdxSourceConnectionService.CLUSTER_URL));
+    }
+
+
+    @Test
+    void testInvalidConnectionMissingProperty() {
+        configureAppId();
+        configureAppKey();
+        configureAppTenant();
+        runner.assertNotValid(service);
+        runner.setValidateExpressionUsage(false);
+        Assertions.assertNull(
+                ((ControllerServiceConfiguration)
+                        ((Map.Entry<?, ?>) 
(((StandardProcessorTestRunner)runner).getProcessContext().getControllerServices()).entrySet().toArray()[0])
+                                
.getValue()).getProperty(AzureAdxSourceConnectionService.CLUSTER_URL));

Review Comment:
   This deep nesting should be refactored to multiple interim declared 
variables.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxConnectionServiceParameter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+public enum AzureAdxConnectionServiceParameter {

Review Comment:
   Recommend shortening this name.
   ```suggestion
   public enum AzureDataExplorerParameter {
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/pom.xml:
##########
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <!-- test data -->
+                        <exclude>src/test/resources/*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>

Review Comment:
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/src/main/java/org/apache/nifi/adx/AdxSourceConnectionService.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+@Tags({"azure", "adx"})

Review Comment:
   ```suggestion
   @Tags({"azure", "adx", "explorer", "kusto"})
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml:
##########
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-adx-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-adx</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-adx-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.azure.kusto</groupId>
+            <artifactId>kusto-ingest</artifactId>
+            <version>${azure-kusto-java-sdk-version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.1.2</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <packageName>org.apache.nifi.adx</packageName>
+                        </manifest>
+                        <manifestEntries>
+                            
<implementation-title>nifi-adx</implementation-title>
+                            
<implementation-version>${project.version}</implementation-version>
+                        </manifestEntries>

Review Comment:
   Instead of using these custom entries, the standard 
`addDefaultImplementationEntries` element should be set in the `manifest` 
section.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/NiFiVersion.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NiFiVersion {
+    public static final String CLIENT_NAME = "Kusto.Nifi";
+    private static final Logger log = 
LoggerFactory.getLogger(NiFiVersion.class);
+    private static String version = "unknown";
+
+    private NiFiVersion() {
+    }
+
+    static {
+        try {
+            version = 
NiFiVersion.class.getPackage().getImplementationVersion();
+            log.info("Loaded version: {}", version);
+        } catch (Exception e) {
+            log.warn("Error while loading version:", e);

Review Comment:
   This logging is not necessary and should be removed.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/model/ADXConnectionParams.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.model;
+
+public class ADXConnectionParams {

Review Comment:
   ```suggestion
   public class AzureDataExplorerConnectionParameters {
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {

Review Comment:
   Following project conventions, recommend the following name aligned with the 
suggestion for the interface name.
   ```suggestion
   public class StandardKustoQueryService extends AbstractControllerService 
implements AdxSourceConnectionService {
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxConnectionServiceParameter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+public enum AzureAdxConnectionServiceParameter {
+
+    AUTH_STRATEGY("Kusto Authentication Method", "The strategy/method to 
authenticate against Azure Active Directory, either 'application' or 
'managed_identity'."),
+    APP_ID("Application ID", "Azure application ID for accessing the 
ADX-Cluster"),
+    APP_KEY("Application KEY", "Azure application Key for accessing the 
ADX-Cluster"),
+    APP_TENANT("Application Tenant", "Azure application tenant for accessing 
the ADX-Cluster"),
+    CLUSTER_URL("Cluster URL", "Endpoint of ADX cluster. This is required only 
when streaming data to ADX cluster is enabled.");
+
+    private final String paramDisplayName;
+    private final String description;
+
+
+    AzureAdxConnectionServiceParameter(String paramDisplayName, String 
description) {
+        this.paramDisplayName = paramDisplayName;
+        this.description = description;
+    }
+
+    public String getParamDisplayName() {

Review Comment:
   ```suggestion
       public String getDisplayName() {
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");
+        adxConnectionParams = new ADXConnectionParams();
+        
adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue());
+        if (this.executionClient != null) {
+            onStopped();
+        }
+
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if(this.executionClient!=null){
+            try {
+                this.executionClient.close();
+            } catch (Exception e) {
+                getLogger().error("Error closing Kusto Execution Client", e);

Review Comment:
   ```suggestion
                   getLogger().error("Kusto Client close failed", e);
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");
+        adxConnectionParams = new ADXConnectionParams();
+        
adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue());

Review Comment:
   The properties do not declare support for Expression Language, which they 
should not, but the `evaluateAttributeExpression()` calls should be removed.



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");
+        adxConnectionParams = new ADXConnectionParams();
+        
adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue());
+        if (this.executionClient != null) {
+            onStopped();
+        }
+
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if(this.executionClient!=null){
+            try {
+                this.executionClient.close();
+            } catch (Exception e) {
+                getLogger().error("Error closing Kusto Execution Client", e);
+            }
+        }
+        this.executionClient = null;
+    }
+
+
+    public Client getKustoQueryClient() {
+        return 
createKustoExecutionClient(adxConnectionParams.getKustoEngineURL(),
+                adxConnectionParams.getAppId(),
+                adxConnectionParams.getAppKey(),
+                adxConnectionParams.getAppTenant(),
+                adxConnectionParams.getKustoAuthStrategy());
+    }
+
+    @Override
+    public KustoQueryResponse executeQuery(String databaseName, String query){
+        if (this.executionClient == null) {
+            this.executionClient = getKustoQueryClient();
+        }
+        List<List<Object>> tableData;
+        KustoQueryResponse kustoQueryResponse;
+        KustoOperationResult kustoOperationResult;
+        try {
+            kustoOperationResult = this.executionClient.execute(databaseName, 
query);
+        } catch (DataServiceException | DataClientException e) {
+            String errorMessage;
+            
if(Arrays.stream(ExceptionUtils.getRootCauseStackTrace(e)).anyMatch(str -> 
str.contains("LimitsExceeded"))){
+                errorMessage = "Exception occurred while reading data from ADX 
: Query Limits exceeded : Please modify your query to fetch results below the 
kusto query limits";

Review Comment:
   This message can be shortened:
   ```suggestion
                   errorMessage = "Query Limits exceeded: Please modify the 
query to fetch results below Kusto limits";
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");
+        adxConnectionParams = new ADXConnectionParams();
+        
adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue());
+        if (this.executionClient != null) {
+            onStopped();
+        }
+
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if(this.executionClient!=null){
+            try {
+                this.executionClient.close();
+            } catch (Exception e) {
+                getLogger().error("Error closing Kusto Execution Client", e);
+            }
+        }
+        this.executionClient = null;
+    }
+
+
+    public Client getKustoQueryClient() {
+        return 
createKustoExecutionClient(adxConnectionParams.getKustoEngineURL(),
+                adxConnectionParams.getAppId(),
+                adxConnectionParams.getAppKey(),
+                adxConnectionParams.getAppTenant(),
+                adxConnectionParams.getKustoAuthStrategy());
+    }
+
+    @Override
+    public KustoQueryResponse executeQuery(String databaseName, String query){
+        if (this.executionClient == null) {
+            this.executionClient = getKustoQueryClient();
+        }
+        List<List<Object>> tableData;
+        KustoQueryResponse kustoQueryResponse;
+        KustoOperationResult kustoOperationResult;
+        try {
+            kustoOperationResult = this.executionClient.execute(databaseName, 
query);
+        } catch (DataServiceException | DataClientException e) {
+            String errorMessage;
+            
if(Arrays.stream(ExceptionUtils.getRootCauseStackTrace(e)).anyMatch(str -> 
str.contains("LimitsExceeded"))){
+                errorMessage = "Exception occurred while reading data from ADX 
: Query Limits exceeded : Please modify your query to fetch results below the 
kusto query limits";
+            }else {
+                errorMessage = "Exception occurred while reading data from 
ADX";
+            }
+            getLogger().error(errorMessage, e);
+            kustoQueryResponse = new KustoQueryResponse(true,errorMessage);

Review Comment:
   ```suggestion
               kustoQueryResponse = new KustoQueryResponse(true, errorMessage);
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx.service;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.AzureAdxConnectionServiceParameter;
+import org.apache.nifi.adx.NiFiVersion;
+import org.apache.nifi.adx.model.ADXConnectionParams;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flow file content or stream flow file 
content to an Azure ADX cluster.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The 
strategy/method to authenticate against Azure Active Directory, either 
'application' or 'managed_identity'."),
+        @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure 
application id for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure 
application key for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "APP_TENANT", description = "Azure 
application tenant for accessing the ADX-Cluster."),
+        @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of 
ADX cluster. This is required only when streaming data to ADX cluster is 
enabled."),
+})
+public class AzureAdxSourceConnectionService extends AbstractControllerService 
implements AdxSourceConnectionService {
+
+    private static final String KUSTO_STRATEGY_APPLICATION = "application";
+
+    private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = 
"managed_identity";
+
+    public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", 
"nifi-source");
+
+    public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription())
+            .required(false)
+            .defaultValue(KUSTO_STRATEGY_APPLICATION)
+            .allowableValues(KUSTO_STRATEGY_APPLICATION, 
KUSTO_STRATEGY_MANAGED_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor APP_ID = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_ID.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_KEY = new PropertyDescriptor
+            .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription())
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name())
+            
.displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor
+            
.Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name())
+            
.displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName())
+            
.description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription())
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL);
+
+    private Client executionClient;
+
+    private ADXConnectionParams adxConnectionParams;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException {
+        getLogger().info("Starting Azure ADX Source Connection Service...");
+        adxConnectionParams = new ADXConnectionParams();
+        
adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue());
+        
adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue());
+        if (this.executionClient != null) {
+            onStopped();
+        }
+
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if(this.executionClient!=null){

Review Comment:
   ```suggestion
           if (this.executionClient != null) {
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/test/java/org/apache/nifi/adx/TestAzureAdxSourceConnectionService.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.adx;
+
+import com.microsoft.azure.kusto.data.Client;
+import org.apache.nifi.adx.service.AzureAdxSourceConnectionService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.ControllerServiceConfiguration;
+import org.apache.nifi.util.StandardProcessorTestRunner;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestAzureAdxSourceConnectionService {
+
+    private TestRunner runner;
+
+    private AzureAdxSourceConnectionService service;
+
+    private static final String MOCK_APP_ID = "mockAppId";
+
+    private static final String MOCK_APP_KEY = "mockAppKey";
+
+    private static final String MOCK_APP_TENANT = "mockAppTenant";
+
+    private static final String MOCK_CLUSTER_URL = 
"https://mockClusterUrl.com/";;
+
+    @BeforeEach
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(new AbstractProcessor() {
+            @Override
+            public void onTrigger(ProcessContext context, ProcessSession 
session) throws ProcessException {
+            }
+
+            @Override
+            protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
+                List<PropertyDescriptor> propDescs = new ArrayList<>();
+                propDescs.add(new PropertyDescriptor.Builder()
+                        .name("AdxService")
+                        .description("AdxService")
+                        
.identifiesControllerService(AdxSourceConnectionService.class)
+                        .required(true)
+                        .build());
+                return propDescs;
+            }
+        });

Review Comment:
   This can be simplified:
   ```suggestion
           runner = TestRunners.newTestRunner(NoOpProcessor.class);
   ```



##########
nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.adx;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.adx.AdxSourceConnectionService;
+import org.apache.nifi.adx.model.KustoQueryResponse;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"})
+@CapabilityDescription("This Processor acts as a ADX source connector which 
queries data from Azure Data Explorer."+
+        "This connector can act only as a start of the data pipeline getting 
data from ADX."+
+        "The queries which can be used further details can be found here 
https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits";)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
QueryAzureDataExplorer.ADX_QUERY_ERROR_MESSAGE, description = "Azure Data 
Explorer error message."),
+        @WritesAttribute(attribute = 
QueryAzureDataExplorer.ADX_EXECUTED_QUERY, description = "Azure Data Explorer 
executed query.")
+})
+public class QueryAzureDataExplorer extends AbstractProcessor {
+    public static final String ADX_QUERY_ERROR_MESSAGE = 
"adx.query.error.message";
+    public static final String ADX_EXECUTED_QUERY = "adx.executed.query";
+
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("SUCCESS")
+            .description("Relationship for success")
+            .build();
+    public static final Relationship FAILED = new Relationship.Builder()
+            .name("FAILED")
+            .description("Relationship for failure")
+            .build();
+    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor
+            .Builder().name(AzureAdxSourceProcessorParameter.DB_NAME.name())
+            
.displayName(AzureAdxSourceProcessorParameter.DB_NAME.getParamDisplayName())
+            
.description(AzureAdxSourceProcessorParameter.DB_NAME.getParamDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor ADX_QUERY = new PropertyDescriptor
+            .Builder().name(AzureAdxSourceProcessorParameter.ADX_QUERY.name())
+            
.displayName(AzureAdxSourceProcessorParameter.ADX_QUERY.getParamDisplayName())
+            
.description(AzureAdxSourceProcessorParameter.ADX_QUERY.getParamDescription())
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor ADX_SOURCE_SERVICE = new 
PropertyDescriptor
+            
.Builder().name(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.name())
+            
.displayName(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.getParamDisplayName())
+            
.description(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.getParamDescription())
+            .required(true)
+            .identifiesControllerService(AdxSourceConnectionService.class)
+            .build();
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private final Set<Relationship> relationships = Set.of(SUCCESS,FAILED);
+    private final List<PropertyDescriptor> descriptors = 
List.of(ADX_SOURCE_SERVICE,DB_NAME,ADX_QUERY);
+    private AdxSourceConnectionService service;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        service = 
context.getProperty(ADX_SOURCE_SERVICE).asControllerService(AdxSourceConnectionService.class);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile outgoingFlowFile;
+        String databaseName = context.getProperty(DB_NAME).getValue();
+        String adxQuery;
+        KustoQueryResponse kustoQueryResponse;
+
+        //checks if this processor has any preceding connection, if yes 
retrieve
+        if (context.hasIncomingConnection()) {
+            FlowFile incomingFlowFile = session.get();
+            //incoming connection exists but the incoming flowfile is null
+            if (incomingFlowFile == null && context.hasNonLoopConnection()) {
+                return;
+            }
+            //incoming connection exists and retrieve adxQuery from context
+            if (incomingFlowFile != null && incomingFlowFile.getSize() == 0) {
+                if (context.getProperty(ADX_QUERY).isSet()) {
+                    adxQuery = 
context.getProperty(ADX_QUERY).evaluateAttributeExpressions(incomingFlowFile).getValue();
+                } else {
+                    String message = "FlowFile query is empty and no scheduled 
query is set";
+                    getLogger().error(message);
+                    incomingFlowFile = session.putAttribute(incomingFlowFile, 
ADX_QUERY_ERROR_MESSAGE, message);
+                    session.transfer(incomingFlowFile, FAILED);
+                    return;
+                }
+            } else {
+                try {
+                    adxQuery = getQuery(session, incomingFlowFile);
+                } catch(IOException ioe) {
+                    throw new ProcessException("Failed to read Query from 
FlowFile",ioe);
+                }
+            }
+            outgoingFlowFile = incomingFlowFile;
+        } else {
+            outgoingFlowFile = session.create();
+            adxQuery = 
context.getProperty(ADX_QUERY).evaluateAttributeExpressions(outgoingFlowFile).getValue();
+        }
+
+        try {
+            //execute Query
+            kustoQueryResponse = executeQuery(databaseName,adxQuery);
+            if(!kustoQueryResponse.isError()){
+                try(ByteArrayInputStream bais = new 
ByteArrayInputStream(objectMapper.writeValueAsBytes(kustoQueryResponse.getTableData()))){
+                    session.importFrom(bais, outgoingFlowFile);
+                }

Review Comment:
   This approach is very memory-intensive and needs to be changed. Serializing 
the entire result set into a byte array can easily lead to memory exhaustion 
for large numbers of results. The Kusto ClientFactory also includes a 
`StreamingClient` that returns an `InputStream`. Based on that option, both the 
Controller Service interface and this Processor implementation should be 
changed. The Service should return a streaming query result, which can be 
transferred directly to the FlowFile OutputStream using one of the methods on 
ProcessSession.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to