exceptionfactory commented on code in PR #7122: URL: https://github.com/apache/nifi/pull/7122#discussion_r1203243558
########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/pom.xml: ########## @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx-api</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <!-- test data --> + <exclude>src/test/resources/*</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> Review Comment: This block is not necessary since the `nifi-adx-api` module does not contain any test sources. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-nar/pom.xml: ########## @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-processors</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <!-- test data --> + <exclude>src/test/resources/*</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> Review Comment: This block is not necessary because `nifi-adx-nar` does not contain any tests or test resources. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/src/main/java/org/apache/nifi/adx/AdxSourceConnectionService.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + +@Tags({"azure", "adx"}) +@CapabilityDescription("Connection-Service to Azure ADX (Kusto) cluster.") +public interface AdxSourceConnectionService extends ControllerService { Review Comment: Recommend renaming this service interface. ```suggestion public interface KustoQueryService extends ControllerService { ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml: ########## @@ -0,0 +1,115 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-api</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <version>2.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>com.microsoft.azure.kusto</groupId> + <artifactId>kusto-ingest</artifactId> + <version>${azure-kusto-java-sdk-version}</version> + <exclusions> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> Review Comment: The license plugin execution cannot be skipped as part of standard execution, this plugin configuration should be removed. ```suggestion ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxConnectionServiceParameter.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +public enum AzureAdxConnectionServiceParameter { + + AUTH_STRATEGY("Kusto Authentication Method", "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), Review Comment: ```suggestion AUTHENTICATION_METHOD("Kusto Authentication Method", "The strategy or method to authenticate against Azure Data Explorer"), ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); Review Comment: This log message duplicates standard framework logging and is not necessary. ```suggestion ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; Review Comment: ```suggestion private Client kustoClient; ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); + adxConnectionParams = new ADXConnectionParams(); + adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue()); + if (this.executionClient != null) { + onStopped(); + } + + } + + @OnStopped + public final void onStopped() { + if(this.executionClient!=null){ + try { + this.executionClient.close(); + } catch (Exception e) { + getLogger().error("Error closing Kusto Execution Client", e); + } + } + this.executionClient = null; + } + + + public Client getKustoQueryClient() { + return createKustoExecutionClient(adxConnectionParams.getKustoEngineURL(), + adxConnectionParams.getAppId(), + adxConnectionParams.getAppKey(), + adxConnectionParams.getAppTenant(), + adxConnectionParams.getKustoAuthStrategy()); + } + + @Override + public KustoQueryResponse executeQuery(String databaseName, String query){ + if (this.executionClient == null) { + this.executionClient = getKustoQueryClient(); + } Review Comment: This approach to initialization is not thread-safe. It looks like creating the Kusto Client could be moved to `onEnabled()`. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-nar/pom.xml: ########## @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-processors</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <!-- test data --> + <exclude>src/test/resources/*</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> Review Comment: ```suggestion ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/resources/azure-kusto-nifi-version.properties: ########## @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version=${project.version} Review Comment: This file can be removed ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/test/java/org/apache/nifi/adx/TestAzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import com.microsoft.azure.kusto.data.Client; +import org.apache.nifi.adx.service.AzureAdxSourceConnectionService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.ControllerServiceConfiguration; +import org.apache.nifi.util.StandardProcessorTestRunner; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestAzureAdxSourceConnectionService { + + private TestRunner runner; + + private AzureAdxSourceConnectionService service; + + private static final String MOCK_APP_ID = "mockAppId"; + + private static final String MOCK_APP_KEY = "mockAppKey"; + + private static final String MOCK_APP_TENANT = "mockAppTenant"; + + private static final String MOCK_CLUSTER_URL = "https://mockClusterUrl.com/"; + + @BeforeEach + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(new AbstractProcessor() { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> propDescs = new ArrayList<>(); + propDescs.add(new PropertyDescriptor.Builder() + .name("AdxService") + .description("AdxService") + .identifiesControllerService(AdxSourceConnectionService.class) + .required(true) + .build()); + return propDescs; + } + }); + + service = new AzureAdxSourceConnectionService(); + runner.addControllerService("test-good", service); + } + + @AfterEach + public void after() { + runner.clearProperties(); + } + + /** + * test successful adx connection scenario where all valid parameters are passed + */ + @Test + void testAdxConnectionController() { + configureAppId(); + configureAppKey(); + configureAppTenant(); + configureClusterURL(); + + runner.assertValid(service); + } + + /** + * test successful adx connection scenario where all valid parameters are passed + */ + @Test + void testCreateExecutionClientSuccess() { + configureAppId(); + configureAppKey(); + configureAppTenant(); + configureClusterURL(); + runner.assertValid(service); + runner.setValidateExpressionUsage(false); + runner.enableControllerService(service); + Client executionClient = service.getKustoQueryClient(); + + Assertions.assertNotNull(executionClient); + } + + @Test + void testPropertyDescriptor() { + configureAppId(); + configureAppKey(); + configureAppTenant(); + configureClusterURL(); + List<PropertyDescriptor> pd = service.getSupportedPropertyDescriptors(); + + assertTrue(pd.contains(AzureAdxSourceConnectionService.APP_ID)); + assertTrue(pd.contains(AzureAdxSourceConnectionService.APP_KEY)); + assertTrue(pd.contains(AzureAdxSourceConnectionService.APP_TENANT)); + assertTrue(pd.contains(AzureAdxSourceConnectionService.CLUSTER_URL)); + } + + + @Test + void testInvalidConnectionMissingProperty() { + configureAppId(); + configureAppKey(); + configureAppTenant(); + runner.assertNotValid(service); + runner.setValidateExpressionUsage(false); + Assertions.assertNull( + ((ControllerServiceConfiguration) + ((Map.Entry<?, ?>) (((StandardProcessorTestRunner)runner).getProcessContext().getControllerServices()).entrySet().toArray()[0]) + .getValue()).getProperty(AzureAdxSourceConnectionService.CLUSTER_URL)); Review Comment: This deep nesting should be refactored to multiple interim declared variables. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxConnectionServiceParameter.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +public enum AzureAdxConnectionServiceParameter { Review Comment: Recommend shortening this name. ```suggestion public enum AzureDataExplorerParameter { ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/pom.xml: ########## @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx-api</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <!-- test data --> + <exclude>src/test/resources/*</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> Review Comment: ```suggestion ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/src/main/java/org/apache/nifi/adx/AdxSourceConnectionService.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + +@Tags({"azure", "adx"}) Review Comment: ```suggestion @Tags({"azure", "adx", "explorer", "kusto"}) ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml: ########## @@ -0,0 +1,115 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-api</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <version>2.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>com.microsoft.azure.kusto</groupId> + <artifactId>kusto-ingest</artifactId> + <version>${azure-kusto-java-sdk-version}</version> + <exclusions> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>3.1.2</version> + <configuration> + <archive> + <manifest> + <packageName>org.apache.nifi.adx</packageName> + </manifest> + <manifestEntries> + <implementation-title>nifi-adx</implementation-title> + <implementation-version>${project.version}</implementation-version> + </manifestEntries> Review Comment: Instead of using these custom entries, the standard `addDefaultImplementationEntries` element should be set in the `manifest` section. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/NiFiVersion.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NiFiVersion { + public static final String CLIENT_NAME = "Kusto.Nifi"; + private static final Logger log = LoggerFactory.getLogger(NiFiVersion.class); + private static String version = "unknown"; + + private NiFiVersion() { + } + + static { + try { + version = NiFiVersion.class.getPackage().getImplementationVersion(); + log.info("Loaded version: {}", version); + } catch (Exception e) { + log.warn("Error while loading version:", e); Review Comment: This logging is not necessary and should be removed. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/model/ADXConnectionParams.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.model; + +public class ADXConnectionParams { Review Comment: ```suggestion public class AzureDataExplorerConnectionParameters { ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { Review Comment: Following project conventions, recommend the following name aligned with the suggestion for the interface name. ```suggestion public class StandardKustoQueryService extends AbstractControllerService implements AdxSourceConnectionService { ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxConnectionServiceParameter.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +public enum AzureAdxConnectionServiceParameter { + + AUTH_STRATEGY("Kusto Authentication Method", "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + APP_ID("Application ID", "Azure application ID for accessing the ADX-Cluster"), + APP_KEY("Application KEY", "Azure application Key for accessing the ADX-Cluster"), + APP_TENANT("Application Tenant", "Azure application tenant for accessing the ADX-Cluster"), + CLUSTER_URL("Cluster URL", "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."); + + private final String paramDisplayName; + private final String description; + + + AzureAdxConnectionServiceParameter(String paramDisplayName, String description) { + this.paramDisplayName = paramDisplayName; + this.description = description; + } + + public String getParamDisplayName() { Review Comment: ```suggestion public String getDisplayName() { ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); + adxConnectionParams = new ADXConnectionParams(); + adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue()); + if (this.executionClient != null) { + onStopped(); + } + + } + + @OnStopped + public final void onStopped() { + if(this.executionClient!=null){ + try { + this.executionClient.close(); + } catch (Exception e) { + getLogger().error("Error closing Kusto Execution Client", e); Review Comment: ```suggestion getLogger().error("Kusto Client close failed", e); ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); + adxConnectionParams = new ADXConnectionParams(); + adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue()); Review Comment: The properties do not declare support for Expression Language, which they should not, but the `evaluateAttributeExpression()` calls should be removed. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); + adxConnectionParams = new ADXConnectionParams(); + adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue()); + if (this.executionClient != null) { + onStopped(); + } + + } + + @OnStopped + public final void onStopped() { + if(this.executionClient!=null){ + try { + this.executionClient.close(); + } catch (Exception e) { + getLogger().error("Error closing Kusto Execution Client", e); + } + } + this.executionClient = null; + } + + + public Client getKustoQueryClient() { + return createKustoExecutionClient(adxConnectionParams.getKustoEngineURL(), + adxConnectionParams.getAppId(), + adxConnectionParams.getAppKey(), + adxConnectionParams.getAppTenant(), + adxConnectionParams.getKustoAuthStrategy()); + } + + @Override + public KustoQueryResponse executeQuery(String databaseName, String query){ + if (this.executionClient == null) { + this.executionClient = getKustoQueryClient(); + } + List<List<Object>> tableData; + KustoQueryResponse kustoQueryResponse; + KustoOperationResult kustoOperationResult; + try { + kustoOperationResult = this.executionClient.execute(databaseName, query); + } catch (DataServiceException | DataClientException e) { + String errorMessage; + if(Arrays.stream(ExceptionUtils.getRootCauseStackTrace(e)).anyMatch(str -> str.contains("LimitsExceeded"))){ + errorMessage = "Exception occurred while reading data from ADX : Query Limits exceeded : Please modify your query to fetch results below the kusto query limits"; Review Comment: This message can be shortened: ```suggestion errorMessage = "Query Limits exceeded: Please modify the query to fetch results below Kusto limits"; ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); + adxConnectionParams = new ADXConnectionParams(); + adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue()); + if (this.executionClient != null) { + onStopped(); + } + + } + + @OnStopped + public final void onStopped() { + if(this.executionClient!=null){ + try { + this.executionClient.close(); + } catch (Exception e) { + getLogger().error("Error closing Kusto Execution Client", e); + } + } + this.executionClient = null; + } + + + public Client getKustoQueryClient() { + return createKustoExecutionClient(adxConnectionParams.getKustoEngineURL(), + adxConnectionParams.getAppId(), + adxConnectionParams.getAppKey(), + adxConnectionParams.getAppTenant(), + adxConnectionParams.getKustoAuthStrategy()); + } + + @Override + public KustoQueryResponse executeQuery(String databaseName, String query){ + if (this.executionClient == null) { + this.executionClient = getKustoQueryClient(); + } + List<List<Object>> tableData; + KustoQueryResponse kustoQueryResponse; + KustoOperationResult kustoOperationResult; + try { + kustoOperationResult = this.executionClient.execute(databaseName, query); + } catch (DataServiceException | DataClientException e) { + String errorMessage; + if(Arrays.stream(ExceptionUtils.getRootCauseStackTrace(e)).anyMatch(str -> str.contains("LimitsExceeded"))){ + errorMessage = "Exception occurred while reading data from ADX : Query Limits exceeded : Please modify your query to fetch results below the kusto query limits"; + }else { + errorMessage = "Exception occurred while reading data from ADX"; + } + getLogger().error(errorMessage, e); + kustoQueryResponse = new KustoQueryResponse(true,errorMessage); Review Comment: ```suggestion kustoQueryResponse = new KustoQueryResponse(true, errorMessage); ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/service/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx.service; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.AzureAdxConnectionServiceParameter; +import org.apache.nifi.adx.NiFiVersion; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); + adxConnectionParams = new ADXConnectionParams(); + adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue()); + if (this.executionClient != null) { + onStopped(); + } + + } + + @OnStopped + public final void onStopped() { + if(this.executionClient!=null){ Review Comment: ```suggestion if (this.executionClient != null) { ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/test/java/org/apache/nifi/adx/TestAzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import com.microsoft.azure.kusto.data.Client; +import org.apache.nifi.adx.service.AzureAdxSourceConnectionService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.ControllerServiceConfiguration; +import org.apache.nifi.util.StandardProcessorTestRunner; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestAzureAdxSourceConnectionService { + + private TestRunner runner; + + private AzureAdxSourceConnectionService service; + + private static final String MOCK_APP_ID = "mockAppId"; + + private static final String MOCK_APP_KEY = "mockAppKey"; + + private static final String MOCK_APP_TENANT = "mockAppTenant"; + + private static final String MOCK_CLUSTER_URL = "https://mockClusterUrl.com/"; + + @BeforeEach + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(new AbstractProcessor() { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> propDescs = new ArrayList<>(); + propDescs.add(new PropertyDescriptor.Builder() + .name("AdxService") + .description("AdxService") + .identifiesControllerService(AdxSourceConnectionService.class) + .required(true) + .build()); + return propDescs; + } + }); Review Comment: This can be simplified: ```suggestion runner = TestRunners.newTestRunner(NoOpProcessor.class); ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adx; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.adx.model.KustoQueryResponse; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Set; + +@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"}) +@CapabilityDescription("This Processor acts as a ADX source connector which queries data from Azure Data Explorer."+ + "This connector can act only as a start of the data pipeline getting data from ADX."+ + "The queries which can be used further details can be found here https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits") +@WritesAttributes({ + @WritesAttribute(attribute = QueryAzureDataExplorer.ADX_QUERY_ERROR_MESSAGE, description = "Azure Data Explorer error message."), + @WritesAttribute(attribute = QueryAzureDataExplorer.ADX_EXECUTED_QUERY, description = "Azure Data Explorer executed query.") +}) +public class QueryAzureDataExplorer extends AbstractProcessor { + public static final String ADX_QUERY_ERROR_MESSAGE = "adx.query.error.message"; + public static final String ADX_EXECUTED_QUERY = "adx.executed.query"; + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("SUCCESS") + .description("Relationship for success") + .build(); + public static final Relationship FAILED = new Relationship.Builder() + .name("FAILED") + .description("Relationship for failure") + .build(); + public static final PropertyDescriptor DB_NAME = new PropertyDescriptor + .Builder().name(AzureAdxSourceProcessorParameter.DB_NAME.name()) + .displayName(AzureAdxSourceProcessorParameter.DB_NAME.getParamDisplayName()) + .description(AzureAdxSourceProcessorParameter.DB_NAME.getParamDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ADX_QUERY = new PropertyDescriptor + .Builder().name(AzureAdxSourceProcessorParameter.ADX_QUERY.name()) + .displayName(AzureAdxSourceProcessorParameter.ADX_QUERY.getParamDisplayName()) + .description(AzureAdxSourceProcessorParameter.ADX_QUERY.getParamDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ADX_SOURCE_SERVICE = new PropertyDescriptor + .Builder().name(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.name()) + .displayName(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.getParamDisplayName()) + .description(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.getParamDescription()) + .required(true) + .identifiesControllerService(AdxSourceConnectionService.class) + .build(); + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Set<Relationship> relationships = Set.of(SUCCESS,FAILED); + private final List<PropertyDescriptor> descriptors = List.of(ADX_SOURCE_SERVICE,DB_NAME,ADX_QUERY); + private AdxSourceConnectionService service; + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + service = context.getProperty(ADX_SOURCE_SERVICE).asControllerService(AdxSourceConnectionService.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile outgoingFlowFile; + String databaseName = context.getProperty(DB_NAME).getValue(); + String adxQuery; + KustoQueryResponse kustoQueryResponse; + + //checks if this processor has any preceding connection, if yes retrieve + if (context.hasIncomingConnection()) { + FlowFile incomingFlowFile = session.get(); + //incoming connection exists but the incoming flowfile is null + if (incomingFlowFile == null && context.hasNonLoopConnection()) { + return; + } + //incoming connection exists and retrieve adxQuery from context + if (incomingFlowFile != null && incomingFlowFile.getSize() == 0) { + if (context.getProperty(ADX_QUERY).isSet()) { + adxQuery = context.getProperty(ADX_QUERY).evaluateAttributeExpressions(incomingFlowFile).getValue(); + } else { + String message = "FlowFile query is empty and no scheduled query is set"; + getLogger().error(message); + incomingFlowFile = session.putAttribute(incomingFlowFile, ADX_QUERY_ERROR_MESSAGE, message); + session.transfer(incomingFlowFile, FAILED); + return; + } + } else { + try { + adxQuery = getQuery(session, incomingFlowFile); + } catch(IOException ioe) { + throw new ProcessException("Failed to read Query from FlowFile",ioe); + } + } + outgoingFlowFile = incomingFlowFile; + } else { + outgoingFlowFile = session.create(); + adxQuery = context.getProperty(ADX_QUERY).evaluateAttributeExpressions(outgoingFlowFile).getValue(); + } + + try { + //execute Query + kustoQueryResponse = executeQuery(databaseName,adxQuery); + if(!kustoQueryResponse.isError()){ + try(ByteArrayInputStream bais = new ByteArrayInputStream(objectMapper.writeValueAsBytes(kustoQueryResponse.getTableData()))){ + session.importFrom(bais, outgoingFlowFile); + } Review Comment: This approach is very memory-intensive and needs to be changed. Serializing the entire result set into a byte array can easily lead to memory exhaustion for large numbers of results. The Kusto ClientFactory also includes a `StreamingClient` that returns an `InputStream`. Based on that option, both the Controller Service interface and this Processor implementation should be changed. The Service should return a streaming query result, which can be transferred directly to the FlowFile OutputStream using one of the methods on ProcessSession. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
