exceptionfactory commented on code in PR #7122: URL: https://github.com/apache/nifi/pull/7122#discussion_r1177940619
########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adx; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"}) +@CapabilityDescription("This Processor acts as a ADX source connector which queries data from Azure Data Explorer."+ + "This connector can act only as a start of the data pipeline getting data from ADX."+ + "The queries which can be used further details can be found here https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits") +@WritesAttributes({ + @WritesAttribute(attribute = "ADX_QUERY_ERROR_MESSAGE", description = "Azure Data Explorer error message."), + @WritesAttribute(attribute = "ADX_EXECUTED_QUERY", description = "Azure Data Explorer executed query.") Review Comment: FlowFile attribute names generally follow the convention of lowercase with `.` separators. Recommend the following: ```suggestion @WritesAttribute(attribute = "adx.query.error.message", description = "Azure Data Explorer error message."), @WritesAttribute(attribute = "adx.query.executed", description = "Azure Data Explorer executed query.") ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adx; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"}) +@CapabilityDescription("This Processor acts as a ADX source connector which queries data from Azure Data Explorer."+ + "This connector can act only as a start of the data pipeline getting data from ADX."+ + "The queries which can be used further details can be found here https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits") +@WritesAttributes({ + @WritesAttribute(attribute = "ADX_QUERY_ERROR_MESSAGE", description = "Azure Data Explorer error message."), + @WritesAttribute(attribute = "ADX_EXECUTED_QUERY", description = "Azure Data Explorer executed query.") +}) +public class QueryAzureDataExplorer extends AbstractProcessor { + public static final String ADX_QUERY_ERROR_MESSAGE = "adx.query.error.message"; + public static final String ADX_EXECUTED_QUERY = "adx.executed.query"; Review Comment: These two values can be referenced in the `WritesAttribute` annotations. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adx; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"}) +@CapabilityDescription("This Processor acts as a ADX source connector which queries data from Azure Data Explorer."+ + "This connector can act only as a start of the data pipeline getting data from ADX."+ + "The queries which can be used further details can be found here https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits") +@WritesAttributes({ + @WritesAttribute(attribute = "ADX_QUERY_ERROR_MESSAGE", description = "Azure Data Explorer error message."), + @WritesAttribute(attribute = "ADX_EXECUTED_QUERY", description = "Azure Data Explorer executed query.") +}) +public class QueryAzureDataExplorer extends AbstractProcessor { + public static final String ADX_QUERY_ERROR_MESSAGE = "adx.query.error.message"; + public static final String ADX_EXECUTED_QUERY = "adx.executed.query"; + public static final String RELATIONSHIP_SUCCESS = "SUCCESS"; + + public static final String RELATIONSHIP_FAILED = "FAILED"; + public static final String RELATIONSHIP_FAILED_DESC = "Relationship for failure"; + public static final String RELATIONSHIP_SUCCESS_DESC = "Relationship for success"; + + public static final Relationship SUCCESS = new Relationship.Builder() + .name(RELATIONSHIP_SUCCESS) + .description(RELATIONSHIP_SUCCESS_DESC) + .build(); + public static final Relationship FAILED = new Relationship.Builder() + .name(RELATIONSHIP_FAILED) + .description(RELATIONSHIP_FAILED_DESC) + .build(); + public static final PropertyDescriptor DB_NAME = new PropertyDescriptor + .Builder().name(AzureAdxSourceProcessorParameter.DB_NAME.name()) + .displayName(AzureAdxSourceProcessorParameter.DB_NAME.getParamDisplayName()) + .description(AzureAdxSourceProcessorParameter.DB_NAME.getParamDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ADX_QUERY = new PropertyDescriptor + .Builder().name(AzureAdxSourceProcessorParameter.ADX_QUERY.name()) + .displayName(AzureAdxSourceProcessorParameter.ADX_QUERY.getParamDisplayName()) + .description(AzureAdxSourceProcessorParameter.ADX_QUERY.getParamDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ADX_SOURCE_SERVICE = new PropertyDescriptor + .Builder().name(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.name()) + .displayName(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.getParamDisplayName()) + .description(AzureAdxSourceProcessorParameter.ADX_SOURCE_SERVICE.getParamDescription()) + .required(true) + .identifiesControllerService(AdxSourceConnectionService.class) + .build(); + private final ObjectMapper objectMapper = new ObjectMapper(); + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + private Client executionClient; + + @Override + protected void init(final ProcessorInitializationContext context) { + this.descriptors = List.of(ADX_SOURCE_SERVICE,DB_NAME,ADX_QUERY); + this.relationships = Set.of(SUCCESS,FAILED); Review Comment: These collections can be declared statically and the `init` method can be removed. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml: ########## @@ -0,0 +1,118 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-api</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <version>2.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> Review Comment: This dependency is not necessary and should be removed. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml: ########## @@ -0,0 +1,118 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx</artifactId> Review Comment: Following general project conventions, the module name should include the word `service` indicating that it contains the Controller Service implementation. ```suggestion <artifactId>nifi-adx-service</artifactId> ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; + +import java.util.List; +import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) Review Comment: Controller Services do not read FlowFile attributes, so these annotations should be removed. ```suggestion ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; + +import java.util.List; +import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); + adxConnectionParams = new ADXConnectionParams(); + adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppTenant(context.getProperty(APP_TENANT).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setKustoEngineURL(context.getProperty(CLUSTER_URL).evaluateAttributeExpressions().getValue()); + if (this.executionClient != null) { + onStopped(); + } + + } + + @OnStopped + public final void onStopped() { + if(this.executionClient!=null){ Review Comment: ```suggestion if (this.executionClient != null) { ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/NiFiVersion.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NiFiVersion { + public static final String CLIENT_NAME = "Kusto.Nifi"; + + public static final Pair<String,String> NIFI_SOURCE = Pair.of("processor", "nifi-source"); Review Comment: Is this value necessary? ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/test/java/org/apache/nifi/adx/MockAzureAdxSourceProcessor.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.ArrayList; +import java.util.List; + +public class MockAzureAdxSourceProcessor extends AbstractProcessor { Review Comment: This class can be removed in favor of a mock created using Mockito. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/enums/AzureAdxSourceProcessorParameter.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adx.enums; + +public enum AzureAdxSourceProcessorParameter { + DB_NAME("Database name", "The name of the database where the query will be executed."), Review Comment: Display Names should follow the "Title Case" convention, capitalizing most of the words. ```suggestion DB_NAME("Database Name", "The name of the database where the query will be executed."), ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/QueryAzureDataExplorer.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adx; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.nifi.adx.AdxSourceConnectionService; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.adx.enums.AzureAdxSourceProcessorParameter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +@Tags({"azure", "adx", "microsoft", "data", "explorer", "source"}) +@CapabilityDescription("This Processor acts as a ADX source connector which queries data from Azure Data Explorer."+ + "This connector can act only as a start of the data pipeline getting data from ADX."+ + "The queries which can be used further details can be found here https://learn.microsoft.com/en-us/azure/data-explorer/kusto/concepts/querylimits") +@WritesAttributes({ + @WritesAttribute(attribute = "ADX_QUERY_ERROR_MESSAGE", description = "Azure Data Explorer error message."), + @WritesAttribute(attribute = "ADX_EXECUTED_QUERY", description = "Azure Data Explorer executed query.") +}) +public class QueryAzureDataExplorer extends AbstractProcessor { + public static final String ADX_QUERY_ERROR_MESSAGE = "adx.query.error.message"; + public static final String ADX_EXECUTED_QUERY = "adx.executed.query"; + public static final String RELATIONSHIP_SUCCESS = "SUCCESS"; + + public static final String RELATIONSHIP_FAILED = "FAILED"; + public static final String RELATIONSHIP_FAILED_DESC = "Relationship for failure"; + public static final String RELATIONSHIP_SUCCESS_DESC = "Relationship for success"; Review Comment: These descriptions should not be public static variables, instead they can be specified directly in the Relationship Builder. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-processors/src/main/java/org/apache/nifi/processors/adx/enums/AzureAdxSourceProcessorParameter.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.adx.enums; + +public enum AzureAdxSourceProcessorParameter { + DB_NAME("Database name", "The name of the database where the query will be executed."), + + ADX_QUERY("ADX query", "The query which needs to be executed in Azure Data Explorer."), Review Comment: The `ADX` prefix on the property name is not necessary. ```suggestion ADX_QUERY("Query", "The query which needs to be executed in Azure Data Explorer."), ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/pom.xml: ########## @@ -0,0 +1,118 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-api</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <version>2.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>com.microsoft.azure.kusto</groupId> + <artifactId>kusto-ingest</artifactId> + <version>${azure-kusto-java-sdk-version}</version> + <exclusions> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <scope>compile</scope> + </dependency> Review Comment: Compile dependencies should be listed before test dependencies. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; + +import java.util.List; +import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; Review Comment: These static string values should be replaced with an `enum` that defines the allowed values. The `enum` should implement the `DescribedValue` interface to provide description information. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; Review Comment: ```suggestion package org.apache.nifi.adx.service; ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; + +import java.util.List; +import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_TENANT = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_TENANT.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_TENANT.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_TENANT.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_URL = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.CLUSTER_URL.name()) + .displayName(AzureAdxConnectionServiceParameter.CLUSTER_URL.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.CLUSTER_URL.getDescription()) + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KUSTO_AUTH_STRATEGY,APP_ID,APP_KEY,APP_TENANT,CLUSTER_URL); + + private Client executionClient; + + private ADXConnectionParams adxConnectionParams; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException { + getLogger().info("Starting Azure ADX Source Connection Service..."); + adxConnectionParams = new ADXConnectionParams(); + adxConnectionParams.setKustoAuthStrategy(context.getProperty(KUSTO_AUTH_STRATEGY).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppId(context.getProperty(APP_ID).evaluateAttributeExpressions().getValue()); + adxConnectionParams.setAppKey(context.getProperty(APP_KEY).evaluateAttributeExpressions().getValue()); Review Comment: Sensitive properties should not support attribute evaluation. ```suggestion adxConnectionParams.setAppKey(context.getProperty(APP_KEY).getValue()); ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx/src/main/java/org/apache/nifi/adx/AzureAdxSourceConnectionService.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.adx.model.ADXConnectionParams; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; + +import java.util.List; +import static org.apache.nifi.adx.NiFiVersion.NIFI_SOURCE; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flow file content or stream flow file content to an Azure ADX cluster.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "AUTH_STRATEGY", description = "The strategy/method to authenticate against Azure Active Directory, either 'application' or 'managed_identity'."), + @ReadsAttribute(attribute = "APP_ID", description = "Specifies Azure application id for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_KEY", description = "Specifies Azure application key for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "APP_TENANT", description = "Azure application tenant for accessing the ADX-Cluster."), + @ReadsAttribute(attribute = "CLUSTER_URL", description = "Endpoint of ADX cluster. This is required only when streaming data to ADX cluster is enabled."), +}) +public class AzureAdxSourceConnectionService extends AbstractControllerService implements AdxSourceConnectionService { + + private static final String KUSTO_STRATEGY_APPLICATION = "application"; + + private static final String KUSTO_STRATEGY_MANAGED_IDENTITY = "managed_identity"; + + public static final PropertyDescriptor KUSTO_AUTH_STRATEGY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.name()) + .displayName(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.AUTH_STRATEGY.getDescription()) + .required(false) + .defaultValue(KUSTO_STRATEGY_APPLICATION) + .allowableValues(KUSTO_STRATEGY_APPLICATION, KUSTO_STRATEGY_MANAGED_IDENTITY) + .build(); + + public static final PropertyDescriptor APP_ID = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_ID.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_ID.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_ID.getDescription()) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APP_KEY = new PropertyDescriptor + .Builder().name(AzureAdxConnectionServiceParameter.APP_KEY.name()) + .displayName(AzureAdxConnectionServiceParameter.APP_KEY.getParamDisplayName()) + .description(AzureAdxConnectionServiceParameter.APP_KEY.getDescription()) + .required(true) Review Comment: This property should be marked as sensitive. ```suggestion .required(true) .sensitive(true) ``` ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-nar/pom.xml: ########## @@ -0,0 +1,63 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-adx-nar</artifactId> + <packaging>nar</packaging> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-adx-processors</artifactId> + <version>${nifi-release-version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <!-- test data --> + <exclude>src/test/resources/*</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <java.io.tmpdir>${project.build.directory}</java.io.tmpdir> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> Review Comment: Is there a specific reason for changing the temporary directory? This seems unnecessary. ########## nifi-nar-bundles/nifi-adx-bundle/nifi-adx-api/src/main/java/org/apache/nifi/adx/AdxSourceConnectionService.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.adx; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + +import com.microsoft.azure.kusto.data.Client; Review Comment: This `Client` references places a hard dependency on a specific version of the Kusto library. It looks like the use of the Client in the Processor is limited to return results. Instead of having the interface return the `Client`, it looks like this service interface could be abstract with a `query()` method that returns Data in the form of `List<List<Object>>`. What do you think of that approach? That would allow removing the dependency on the Kusto library from the API module, and make the service more flexible should it be necessary to upgrade the Kusto library for the Controller Service implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
