exceptionfactory commented on code in PR #7624: URL: https://github.com/apache/nifi/pull/7624#discussion_r1394354709
########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.data.explorer; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat; +import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse; +import org.apache.nifi.services.azure.data.explorer.KustoIngestService; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"}) +@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector which sends flowFiles using the ADX-Service to the provided Azure Data" + + "Explorer Ingest Endpoint. The data can be sent through queued ingestion or streaming ingestion to the Azure Data Explorer cluster.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class PutAzureDataExplorer extends AbstractProcessor { + + public static final String FETCH_TABLE_COMMAND = "%s | count"; + public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s policy streamingingestion"; + public static final String DATABASE = "database"; + public static final AllowableValue IGNORE_FIRST_RECORD_YES = new AllowableValue( + "YES", "YES", + "Ignore first record during ingestion"); + + public static final AllowableValue IGNORE_FIRST_RECORD_NO = new AllowableValue( + "NO", "NO", + "Do not ignore first record during ingestion"); + + public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("Database Name") + .displayName("Database Name") + .description("Azure Data Explorer Database Name for querying") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .displayName("Table Name") + .description("Azure Data Explorer Table Name for ingesting data") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MAPPING_NAME = new PropertyDescriptor + .Builder().name("Ingest Mapping name") + .displayName("Ingest Mapping name") + .description("The name of the mapping responsible for storing the data in the appropriate columns.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IS_STREAMING_ENABLED = new PropertyDescriptor + .Builder().name("Streaming Enabled") + .displayName("Streaming Enabled") + .description("This property determines whether we want to stream data to ADX.") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new PropertyDescriptor + .Builder().name("Poll On Ingestion Status") + .displayName("Whether To Poll On Ingestion Status") + .description("This property determines whether we want to poll on ingestion status after an ingestion to ADX is completed") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor + .Builder().name("Kusto Ingest Service") + .displayName("Kusto Ingest Service") + .description("Azure Data Explorer Kusto Ingest Service") + .required(true) + .identifiesControllerService(KustoIngestService.class) + .build(); + public static final PropertyDescriptor DATA_FORMAT = new PropertyDescriptor.Builder() + .name("Data Format") + .displayName("Data Format") + .description("The format of the data that is sent to Azure Data Explorer.") + .required(true) + .allowableValues(KustoIngestDataFormat.values()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IGNORE_FIRST_RECORD = new PropertyDescriptor.Builder() + .name("Ingestion Ignore First Record") + .displayName("Ingestion Ignore First Record") + .description("Defines whether ignore first record while ingestion.") + .required(false) + .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO) + .defaultValue(IGNORE_FIRST_RECORD_NO.getValue()) + .build(); + public static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = new PropertyDescriptor.Builder() + .name("Ingest Status Polling Timeout") + .displayName("Ingest Status Polling Timeout") + .description("Defines the value of timeout for polling on ingestion status") + .required(false) + .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString()) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 m") + .build(); + public static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = new PropertyDescriptor.Builder() + .name("Ingest Status Polling Interval") + .displayName("Ingest Status Polling Interval") + .description("Defines the value of timeout for polling on ingestion status in seconds.") + .required(false) + .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString()) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 s") + .build(); + public static final Relationship SUCCESS = new Relationship.Builder() + .name("Success") + .description("Relationship For Success") + .build(); + public static final Relationship FAILURE = new Relationship.Builder() + .name("Failure") Review Comment: ```suggestion .name("failure") ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + +import java.net.URISyntaxException; + +@Tags({"azure", "adx", "explorer", "kusto", "ingest"}) +@CapabilityDescription("Connection-Service for Azure ADX (Kusto) ingestion cluster to ingest data.") +public interface KustoIngestService extends ControllerService { + KustoIngestionResult ingestData(KustoIngestionRequest kustoIngestionRequest) throws URISyntaxException; Review Comment: The `URISyntaxException` is too specific to the implementation and should be removed, it could be wrapped in some other type of `IllegalArgumentException` in the implementation if needed. ```suggestion KustoIngestionResult ingestData(KustoIngestionRequest kustoIngestionRequest); ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + +import java.net.URISyntaxException; + +@Tags({"azure", "adx", "explorer", "kusto", "ingest"}) +@CapabilityDescription("Connection-Service for Azure ADX (Kusto) ingestion cluster to ingest data.") +public interface KustoIngestService extends ControllerService { + KustoIngestionResult ingestData(KustoIngestionRequest kustoIngestionRequest) throws URISyntaxException; + + KustoIngestQueryResponse executeQuery(String databaseName, String query); + + KustoIngestQueryResponse checkIfStreamingIsEnabled(String databaseName, String query); Review Comment: The method name should be shortened. The `query` parameter should be changed to `tableName` and the query itself should be constructed in the service implementation, instead of the Processor. ```suggestion KustoIngestQueryResponse isStreamingEnabled(String databaseName, String query); ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.data.explorer; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat; +import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse; +import org.apache.nifi.services.azure.data.explorer.KustoIngestService; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"}) +@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector which sends flowFiles using the ADX-Service to the provided Azure Data" + + "Explorer Ingest Endpoint. The data can be sent through queued ingestion or streaming ingestion to the Azure Data Explorer cluster.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class PutAzureDataExplorer extends AbstractProcessor { + + public static final String FETCH_TABLE_COMMAND = "%s | count"; + public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s policy streamingingestion"; + public static final String DATABASE = "database"; + public static final AllowableValue IGNORE_FIRST_RECORD_YES = new AllowableValue( + "YES", "YES", + "Ignore first record during ingestion"); + + public static final AllowableValue IGNORE_FIRST_RECORD_NO = new AllowableValue( + "NO", "NO", + "Do not ignore first record during ingestion"); + + public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("Database Name") + .displayName("Database Name") + .description("Azure Data Explorer Database Name for querying") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .displayName("Table Name") + .description("Azure Data Explorer Table Name for ingesting data") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MAPPING_NAME = new PropertyDescriptor + .Builder().name("Ingest Mapping name") + .displayName("Ingest Mapping name") + .description("The name of the mapping responsible for storing the data in the appropriate columns.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IS_STREAMING_ENABLED = new PropertyDescriptor + .Builder().name("Streaming Enabled") + .displayName("Streaming Enabled") + .description("This property determines whether we want to stream data to ADX.") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new PropertyDescriptor + .Builder().name("Poll On Ingestion Status") + .displayName("Whether To Poll On Ingestion Status") + .description("This property determines whether we want to poll on ingestion status after an ingestion to ADX is completed") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor + .Builder().name("Kusto Ingest Service") + .displayName("Kusto Ingest Service") + .description("Azure Data Explorer Kusto Ingest Service") + .required(true) + .identifiesControllerService(KustoIngestService.class) + .build(); + public static final PropertyDescriptor DATA_FORMAT = new PropertyDescriptor.Builder() + .name("Data Format") + .displayName("Data Format") + .description("The format of the data that is sent to Azure Data Explorer.") + .required(true) + .allowableValues(KustoIngestDataFormat.values()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IGNORE_FIRST_RECORD = new PropertyDescriptor.Builder() + .name("Ingestion Ignore First Record") + .displayName("Ingestion Ignore First Record") + .description("Defines whether ignore first record while ingestion.") + .required(false) + .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO) + .defaultValue(IGNORE_FIRST_RECORD_NO.getValue()) + .build(); + public static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = new PropertyDescriptor.Builder() + .name("Ingest Status Polling Timeout") + .displayName("Ingest Status Polling Timeout") + .description("Defines the value of timeout for polling on ingestion status") + .required(false) + .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString()) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 m") + .build(); + public static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = new PropertyDescriptor.Builder() + .name("Ingest Status Polling Interval") + .displayName("Ingest Status Polling Interval") + .description("Defines the value of timeout for polling on ingestion status in seconds.") + .required(false) + .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString()) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 s") + .build(); + public static final Relationship SUCCESS = new Relationship.Builder() + .name("Success") Review Comment: ```suggestion .name("success") ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoQueryResponse.java: ########## @@ -27,16 +28,27 @@ public class KustoQueryResponse { private final String errorMessage; + private final List<List<Object>> queryResult; Review Comment: Now that the interface has been redefined, can this list of results be removed? The values should be passed in the service implementation methods, which would avoid adding this member variable to the public response object. ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoResultColumn; +import com.microsoft.azure.kusto.data.KustoResultSetTable; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import com.microsoft.azure.kusto.ingest.IngestClientFactory; +import com.microsoft.azure.kusto.ingest.IngestionMapping; +import com.microsoft.azure.kusto.ingest.IngestionProperties; +import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient; +import com.microsoft.azure.kusto.ingest.QueuedIngestClient; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; +import com.microsoft.azure.kusto.ingest.result.IngestionResult; +import com.microsoft.azure.kusto.ingest.result.IngestionStatus; +import com.microsoft.azure.kusto.ingest.result.OperationStatus; +import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"}) +@CapabilityDescription("Sends batches of flowfile content or stream flowfile content to an Azure ADX cluster.") +public class StandardKustoIngestService extends AbstractControllerService implements KustoIngestService { + + public static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor + .Builder().name("Authentication Strategy") + .displayName("Authentication Strategy") + .description("Authentication method for access to Azure Data Explorer") + .required(true) + .defaultValue(KustoAuthenticationStrategy.MANAGED_IDENTITY.getValue()) + .allowableValues(KustoAuthenticationStrategy.class) + .build(); + + public static final PropertyDescriptor APPLICATION_CLIENT_ID = new PropertyDescriptor + .Builder().name("Application Client ID") + .displayName("Application Client ID") + .description("Azure Data Explorer Application Client Identifier for Authentication") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APPLICATION_KEY = new PropertyDescriptor + .Builder().name("Application Key") + .displayName("Application Key") + .description("Azure Data Explorer Application Key for Authentication") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue()) + .build(); + + public static final PropertyDescriptor APPLICATION_TENANT_ID = new PropertyDescriptor.Builder() + .name("Application Tenant ID") + .displayName("Application Tenant ID") + .description("Azure Data Explorer Application Tenant Identifier for Authentication") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue()) + .build(); + + public static final PropertyDescriptor CLUSTER_URI = new PropertyDescriptor + .Builder().name("Cluster URI") + .displayName("Cluster URI") + .description("Azure Data Explorer Cluster URI") + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + AUTHENTICATION_STRATEGY, + APPLICATION_CLIENT_ID, + APPLICATION_KEY, + APPLICATION_TENANT_ID, + CLUSTER_URI)); + + public static final Pair<String, String> NIFI_SINK = Pair.of("processor", StandardKustoIngestService.class.getSimpleName()); + + private volatile QueuedIngestClient queuedIngestClient; + + private volatile ManagedStreamingIngestClient managedStreamingIngestClient; + + private volatile Client executionClient; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + /** + * @param context the configuration context + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException, URISyntaxException { + + final String applicationClientId = context.getProperty(APPLICATION_CLIENT_ID).getValue(); + final String applicationKey = context.getProperty(APPLICATION_KEY).getValue(); + final String applicationTenantId = context.getProperty(APPLICATION_TENANT_ID).getValue(); + final String clusterUri = context.getProperty(CLUSTER_URI).getValue(); + final KustoAuthenticationStrategy kustoAuthenticationStrategy = KustoAuthenticationStrategy.valueOf(context.getProperty(AUTHENTICATION_STRATEGY).getValue()); + + if (this.queuedIngestClient == null) { + this.queuedIngestClient = createKustoQueuedIngestClient(clusterUri, applicationClientId, applicationKey, applicationTenantId, kustoAuthenticationStrategy); + } + + if (this.managedStreamingIngestClient == null) { + this.managedStreamingIngestClient = createKustoStreamingIngestClient(clusterUri, applicationClientId, applicationKey, applicationTenantId, kustoAuthenticationStrategy); + } + + if (this.executionClient == null) { + this.executionClient = createKustoExecutionClient(clusterUri, applicationClientId, applicationKey, applicationTenantId, kustoAuthenticationStrategy); + } + + } + + @OnStopped + public final void onStopped() { + if (this.queuedIngestClient != null) { + try { + this.queuedIngestClient.close(); + } catch (IOException e) { + getLogger().error("Closing Azure ADX Queued Ingest Client failed", e); + } finally { + this.queuedIngestClient = null; + } + } + if (this.managedStreamingIngestClient != null) { + try { + this.managedStreamingIngestClient.close(); + } catch (IOException e) { + getLogger().error("Closing Azure ADX Managed Streaming Ingest Client failed", e); + } finally { + this.managedStreamingIngestClient = null; + } + } + if (this.executionClient != null) { + try { + this.executionClient.close(); + } catch (IOException e) { + getLogger().error("Closing Azure ADX Execution Client failed", e); + } finally { + this.executionClient = null; + } + } + } + + + protected QueuedIngestClient createKustoQueuedIngestClient(final String clusterUrl, + final String appId, + final String appKey, + final String appTenant, + final KustoAuthenticationStrategy kustoAuthStrategy) throws URISyntaxException { + ConnectionStringBuilder ingestConnectionStringBuilder = createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, kustoAuthStrategy); + return IngestClientFactory.createClient(ingestConnectionStringBuilder); + } + + protected ManagedStreamingIngestClient createKustoStreamingIngestClient(final String clusterUrl, + final String appId, + final String appKey, + final String appTenant, + final KustoAuthenticationStrategy kustoAuthStrategy) throws URISyntaxException { + ConnectionStringBuilder ingestConnectionStringBuilder = createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, kustoAuthStrategy); + ConnectionStringBuilder streamingConnectionStringBuilder = createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, kustoAuthStrategy); + return IngestClientFactory.createManagedStreamingIngestClient(ingestConnectionStringBuilder, streamingConnectionStringBuilder); + } + + public KustoIngestionResult ingestData(KustoIngestionRequest kustoIngestionRequest) throws URISyntaxException { + StreamSourceInfo info = new StreamSourceInfo(kustoIngestionRequest.getInputStream()); + //ingest data + IngestionResult ingestionResult; + IngestionProperties ingestionProperties = new IngestionProperties(kustoIngestionRequest.getDatabaseName(), + kustoIngestionRequest.getTableName()); + + IngestionMapping.IngestionMappingKind ingestionMappingKind = setDataFormatAndMapping(kustoIngestionRequest.getDataFormat(), ingestionProperties); + if (StringUtils.isNotEmpty(kustoIngestionRequest.getMappingName()) && ingestionMappingKind != null) { + ingestionProperties.setIngestionMapping(kustoIngestionRequest.getMappingName(), ingestionMappingKind); + } + + ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES); + ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE); + ingestionProperties.setFlushImmediately(false); + ingestionProperties.setIgnoreFirstRecord(StringUtils.equalsIgnoreCase(kustoIngestionRequest.getIgnoreFirstRecord(), "YES")); + + try { + if (kustoIngestionRequest.isStreamingEnabled()) { + ingestionResult = managedStreamingIngestClient.ingestFromStream(info, ingestionProperties); + } else { + ingestionResult = queuedIngestClient.ingestFromStream(info, ingestionProperties); + } + if (kustoIngestionRequest.pollOnIngestionStatus()) { + List<IngestionStatus> statuses = initializeKustoIngestionStatusAsPending(); + long startTime = System.currentTimeMillis(); + long timeoutMillis = kustoIngestionRequest.getIngestionStatusPollingTimeout().toMillis(); + // Calculate the end time based on the timeout duration + long endTime = startTime + timeoutMillis; + while (System.currentTimeMillis() < endTime) { + // Get the status of the ingestion operation + List<IngestionStatus> statuses1 = ingestionResult.getIngestionStatusCollection(); + if (statuses1.get(0).status == OperationStatus.Succeeded + || statuses1.get(0).status == OperationStatus.Failed + || statuses1.get(0).status == OperationStatus.PartiallySucceeded) { + statuses = statuses1; + break; + } + // Sleep for before checking again + Thread.sleep(kustoIngestionRequest.getIngestionStatusPollingInterval().toMillis()); + } + // Check if the timeout has been exceeded + if (System.currentTimeMillis() - startTime >= timeoutMillis) { + throw new ProcessException(String.format("Timeout of %s exceeded while waiting for ingestion status", kustoIngestionRequest.getIngestionStatusPollingTimeout())); + } + return KustoIngestionResult.fromString(statuses.get(0).status.toString()); Review Comment: Recommend moving this block into a separate method named something like `pollIngestionStatus()`. ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.data.explorer; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat; +import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse; +import org.apache.nifi.services.azure.data.explorer.KustoIngestService; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"}) +@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector which sends flowFiles using the ADX-Service to the provided Azure Data" + + "Explorer Ingest Endpoint. The data can be sent through queued ingestion or streaming ingestion to the Azure Data Explorer cluster.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class PutAzureDataExplorer extends AbstractProcessor { + + public static final String FETCH_TABLE_COMMAND = "%s | count"; + public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s policy streamingingestion"; + public static final String DATABASE = "database"; + public static final AllowableValue IGNORE_FIRST_RECORD_YES = new AllowableValue( + "YES", "YES", + "Ignore first record during ingestion"); + + public static final AllowableValue IGNORE_FIRST_RECORD_NO = new AllowableValue( + "NO", "NO", + "Do not ignore first record during ingestion"); + + public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("Database Name") + .displayName("Database Name") + .description("Azure Data Explorer Database Name for querying") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .displayName("Table Name") + .description("Azure Data Explorer Table Name for ingesting data") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MAPPING_NAME = new PropertyDescriptor + .Builder().name("Ingest Mapping name") + .displayName("Ingest Mapping name") + .description("The name of the mapping responsible for storing the data in the appropriate columns.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IS_STREAMING_ENABLED = new PropertyDescriptor + .Builder().name("Streaming Enabled") + .displayName("Streaming Enabled") + .description("This property determines whether we want to stream data to ADX.") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new PropertyDescriptor + .Builder().name("Poll On Ingestion Status") + .displayName("Whether To Poll On Ingestion Status") + .description("This property determines whether we want to poll on ingestion status after an ingestion to ADX is completed") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor + .Builder().name("Kusto Ingest Service") + .displayName("Kusto Ingest Service") + .description("Azure Data Explorer Kusto Ingest Service") + .required(true) + .identifiesControllerService(KustoIngestService.class) + .build(); + public static final PropertyDescriptor DATA_FORMAT = new PropertyDescriptor.Builder() + .name("Data Format") + .displayName("Data Format") + .description("The format of the data that is sent to Azure Data Explorer.") + .required(true) + .allowableValues(KustoIngestDataFormat.values()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IGNORE_FIRST_RECORD = new PropertyDescriptor.Builder() + .name("Ingestion Ignore First Record") + .displayName("Ingestion Ignore First Record") + .description("Defines whether ignore first record while ingestion.") + .required(false) + .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO) + .defaultValue(IGNORE_FIRST_RECORD_NO.getValue()) + .build(); + public static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = new PropertyDescriptor.Builder() + .name("Ingest Status Polling Timeout") + .displayName("Ingest Status Polling Timeout") + .description("Defines the value of timeout for polling on ingestion status") + .required(false) + .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString()) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 m") + .build(); + public static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = new PropertyDescriptor.Builder() + .name("Ingest Status Polling Interval") + .displayName("Ingest Status Polling Interval") + .description("Defines the value of timeout for polling on ingestion status in seconds.") + .required(false) + .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString()) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 s") + .build(); + public static final Relationship SUCCESS = new Relationship.Builder() + .name("Success") + .description("Relationship For Success") + .build(); + public static final Relationship FAILURE = new Relationship.Builder() + .name("Failure") + .description("Relationship For Failure") + .build(); + + public static final PropertyDescriptor ROUTE_PARTIALLY_SUCCESSFUL_INGESTION = new PropertyDescriptor.Builder() + .name("Route Partially Successful Ingestion Records") + .displayName("Route Partially Successful Ingestion Records") + .description("Defines where to route partially successful ingestion records.") + .required(false) + .allowableValues(SUCCESS.getName(), FAILURE.getName()) + .defaultValue(FAILURE.getName()) + .build(); + private static final List<PropertyDescriptor> descriptors; + private static final Set<Relationship> relationships; + private transient KustoIngestService service; + private boolean streamingEnabled; + private boolean pollOnIngestionStatus; Review Comment: These values should not be member variables. ```suggestion ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + +import java.net.URISyntaxException; + +@Tags({"azure", "adx", "explorer", "kusto", "ingest"}) +@CapabilityDescription("Connection-Service for Azure ADX (Kusto) ingestion cluster to ingest data.") +public interface KustoIngestService extends ControllerService { + KustoIngestionResult ingestData(KustoIngestionRequest kustoIngestionRequest) throws URISyntaxException; + + KustoIngestQueryResponse executeQuery(String databaseName, String query); + + KustoIngestQueryResponse checkIfStreamingIsEnabled(String databaseName, String query); + + KustoIngestQueryResponse checkIfIngestorPrivilegeIsEnabled(String databaseName, String query); Review Comment: ```suggestion KustoIngestQueryResponse isIngestorPrivilegeEnabled(String databaseName, String tableName); ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.data.explorer; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat; +import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse; +import org.apache.nifi.services.azure.data.explorer.KustoIngestService; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"}) +@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector which sends flowFiles using the ADX-Service to the provided Azure Data" + + "Explorer Ingest Endpoint. The data can be sent through queued ingestion or streaming ingestion to the Azure Data Explorer cluster.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class PutAzureDataExplorer extends AbstractProcessor { + + public static final String FETCH_TABLE_COMMAND = "%s | count"; + public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s policy streamingingestion"; + public static final String DATABASE = "database"; + public static final AllowableValue IGNORE_FIRST_RECORD_YES = new AllowableValue( + "YES", "YES", + "Ignore first record during ingestion"); + + public static final AllowableValue IGNORE_FIRST_RECORD_NO = new AllowableValue( + "NO", "NO", + "Do not ignore first record during ingestion"); + + public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("Database Name") + .displayName("Database Name") + .description("Azure Data Explorer Database Name for querying") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .displayName("Table Name") + .description("Azure Data Explorer Table Name for ingesting data") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MAPPING_NAME = new PropertyDescriptor + .Builder().name("Ingest Mapping name") + .displayName("Ingest Mapping name") + .description("The name of the mapping responsible for storing the data in the appropriate columns.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IS_STREAMING_ENABLED = new PropertyDescriptor + .Builder().name("Streaming Enabled") + .displayName("Streaming Enabled") + .description("This property determines whether we want to stream data to ADX.") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new PropertyDescriptor + .Builder().name("Poll On Ingestion Status") + .displayName("Whether To Poll On Ingestion Status") Review Comment: ```suggestion .Builder().name("Poll for Ingestion Status") .displayName("Poll for Ingestion Status") ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.data.explorer; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat; +import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse; +import org.apache.nifi.services.azure.data.explorer.KustoIngestService; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"}) +@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector which sends flowFiles using the ADX-Service to the provided Azure Data" + + "Explorer Ingest Endpoint. The data can be sent through queued ingestion or streaming ingestion to the Azure Data Explorer cluster.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class PutAzureDataExplorer extends AbstractProcessor { + + public static final String FETCH_TABLE_COMMAND = "%s | count"; + public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s policy streamingingestion"; + public static final String DATABASE = "database"; + public static final AllowableValue IGNORE_FIRST_RECORD_YES = new AllowableValue( + "YES", "YES", + "Ignore first record during ingestion"); + + public static final AllowableValue IGNORE_FIRST_RECORD_NO = new AllowableValue( + "NO", "NO", + "Do not ignore first record during ingestion"); + + public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("Database Name") + .displayName("Database Name") + .description("Azure Data Explorer Database Name for querying") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .displayName("Table Name") + .description("Azure Data Explorer Table Name for ingesting data") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MAPPING_NAME = new PropertyDescriptor + .Builder().name("Ingest Mapping name") + .displayName("Ingest Mapping name") + .description("The name of the mapping responsible for storing the data in the appropriate columns.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IS_STREAMING_ENABLED = new PropertyDescriptor + .Builder().name("Streaming Enabled") + .displayName("Streaming Enabled") + .description("This property determines whether we want to stream data to ADX.") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new PropertyDescriptor + .Builder().name("Poll On Ingestion Status") + .displayName("Whether To Poll On Ingestion Status") + .description("This property determines whether we want to poll on ingestion status after an ingestion to ADX is completed") Review Comment: ```suggestion .description("Determines whether to poll on ingestion status after an ingestion to Azure Data Explorer is completed") ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.data.explorer; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat; +import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse; +import org.apache.nifi.services.azure.data.explorer.KustoIngestService; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest; +import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"}) +@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector which sends flowFiles using the ADX-Service to the provided Azure Data" + + "Explorer Ingest Endpoint. The data can be sent through queued ingestion or streaming ingestion to the Azure Data Explorer cluster.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class PutAzureDataExplorer extends AbstractProcessor { + + public static final String FETCH_TABLE_COMMAND = "%s | count"; + public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s policy streamingingestion"; + public static final String DATABASE = "database"; + public static final AllowableValue IGNORE_FIRST_RECORD_YES = new AllowableValue( + "YES", "YES", + "Ignore first record during ingestion"); + + public static final AllowableValue IGNORE_FIRST_RECORD_NO = new AllowableValue( + "NO", "NO", + "Do not ignore first record during ingestion"); + + public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("Database Name") + .displayName("Database Name") + .description("Azure Data Explorer Database Name for querying") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .displayName("Table Name") + .description("Azure Data Explorer Table Name for ingesting data") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MAPPING_NAME = new PropertyDescriptor + .Builder().name("Ingest Mapping name") + .displayName("Ingest Mapping name") + .description("The name of the mapping responsible for storing the data in the appropriate columns.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IS_STREAMING_ENABLED = new PropertyDescriptor + .Builder().name("Streaming Enabled") + .displayName("Streaming Enabled") + .description("This property determines whether we want to stream data to ADX.") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new PropertyDescriptor + .Builder().name("Poll On Ingestion Status") + .displayName("Whether To Poll On Ingestion Status") + .description("This property determines whether we want to poll on ingestion status after an ingestion to ADX is completed") + .required(false) + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue(Boolean.FALSE.toString()) + .build(); + public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor + .Builder().name("Kusto Ingest Service") + .displayName("Kusto Ingest Service") + .description("Azure Data Explorer Kusto Ingest Service") + .required(true) + .identifiesControllerService(KustoIngestService.class) + .build(); + public static final PropertyDescriptor DATA_FORMAT = new PropertyDescriptor.Builder() + .name("Data Format") + .displayName("Data Format") + .description("The format of the data that is sent to Azure Data Explorer.") + .required(true) + .allowableValues(KustoIngestDataFormat.values()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor IGNORE_FIRST_RECORD = new PropertyDescriptor.Builder() + .name("Ingestion Ignore First Record") + .displayName("Ingestion Ignore First Record") + .description("Defines whether ignore first record while ingestion.") + .required(false) + .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO) + .defaultValue(IGNORE_FIRST_RECORD_NO.getValue()) + .build(); + public static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = new PropertyDescriptor.Builder() + .name("Ingest Status Polling Timeout") + .displayName("Ingest Status Polling Timeout") + .description("Defines the value of timeout for polling on ingestion status") + .required(false) + .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString()) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 m") + .build(); + public static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = new PropertyDescriptor.Builder() + .name("Ingest Status Polling Interval") + .displayName("Ingest Status Polling Interval") + .description("Defines the value of timeout for polling on ingestion status in seconds.") + .required(false) + .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString()) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 s") + .build(); + public static final Relationship SUCCESS = new Relationship.Builder() + .name("Success") + .description("Relationship For Success") + .build(); + public static final Relationship FAILURE = new Relationship.Builder() + .name("Failure") + .description("Relationship For Failure") + .build(); + + public static final PropertyDescriptor ROUTE_PARTIALLY_SUCCESSFUL_INGESTION = new PropertyDescriptor.Builder() + .name("Route Partially Successful Ingestion Records") + .displayName("Route Partially Successful Ingestion Records") + .description("Defines where to route partially successful ingestion records.") + .required(false) + .allowableValues(SUCCESS.getName(), FAILURE.getName()) + .defaultValue(FAILURE.getName()) + .build(); + private static final List<PropertyDescriptor> descriptors; + private static final Set<Relationship> relationships; + private transient KustoIngestService service; + private boolean streamingEnabled; + private boolean pollOnIngestionStatus; + + static { + final List<PropertyDescriptor> descriptorList = new ArrayList<>(); + descriptorList.add(ADX_SERVICE); + descriptorList.add(DATABASE_NAME); + descriptorList.add(TABLE_NAME); + descriptorList.add(MAPPING_NAME); + descriptorList.add(DATA_FORMAT); + descriptorList.add(IGNORE_FIRST_RECORD); + descriptorList.add(IS_STREAMING_ENABLED); + descriptorList.add(POLL_ON_INGESTION_STATUS); + descriptorList.add(ROUTE_PARTIALLY_SUCCESSFUL_INGESTION); + descriptorList.add(INGESTION_STATUS_POLLING_TIMEOUT); + descriptorList.add(INGESTION_STATUS_POLLING_INTERVAL); + descriptors = Collections.unmodifiableList(descriptorList); Review Comment: This should be changed to use List.of() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
