exceptionfactory commented on code in PR #7624:
URL: https://github.com/apache/nifi/pull/7624#discussion_r1394354709


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.data.explorer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestService;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"})
+@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector 
which sends flowFiles using the ADX-Service to the provided Azure Data" +
+        "Explorer Ingest Endpoint. The data can be sent through queued 
ingestion or streaming ingestion to the Azure Data Explorer cluster.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class PutAzureDataExplorer extends AbstractProcessor {
+
+    public static final String FETCH_TABLE_COMMAND = "%s | count";
+    public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s 
policy streamingingestion";
+    public static final String DATABASE = "database";
+    public static final AllowableValue IGNORE_FIRST_RECORD_YES = new 
AllowableValue(
+            "YES", "YES",
+            "Ignore first record during ingestion");
+
+    public static final AllowableValue IGNORE_FIRST_RECORD_NO = new 
AllowableValue(
+            "NO", "NO",
+            "Do not ignore first record during ingestion");
+
+    public static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Database Name")
+            .displayName("Database Name")
+            .description("Azure Data Explorer Database Name for querying")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .displayName("Table Name")
+            .description("Azure Data Explorer Table Name for ingesting data")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MAPPING_NAME = new 
PropertyDescriptor
+            .Builder().name("Ingest Mapping name")
+            .displayName("Ingest Mapping name")
+            .description("The name of the mapping responsible for storing the 
data in the appropriate columns.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IS_STREAMING_ENABLED = new 
PropertyDescriptor
+            .Builder().name("Streaming Enabled")
+            .displayName("Streaming Enabled")
+            .description("This property determines whether we want to stream 
data to ADX.")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new 
PropertyDescriptor
+            .Builder().name("Poll On Ingestion Status")
+            .displayName("Whether To Poll On Ingestion Status")
+            .description("This property determines whether we want to poll on 
ingestion status after an ingestion to ADX is completed")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor
+            .Builder().name("Kusto Ingest Service")
+            .displayName("Kusto Ingest Service")
+            .description("Azure Data Explorer Kusto Ingest Service")
+            .required(true)
+            .identifiesControllerService(KustoIngestService.class)
+            .build();
+    public static final PropertyDescriptor DATA_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Data Format")
+            .displayName("Data Format")
+            .description("The format of the data that is sent to Azure Data 
Explorer.")
+            .required(true)
+            .allowableValues(KustoIngestDataFormat.values())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IGNORE_FIRST_RECORD = new 
PropertyDescriptor.Builder()
+            .name("Ingestion Ignore First Record")
+            .displayName("Ingestion Ignore First Record")
+            .description("Defines whether ignore first record while 
ingestion.")
+            .required(false)
+            .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO)
+            .defaultValue(IGNORE_FIRST_RECORD_NO.getValue())
+            .build();
+    public static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = 
new PropertyDescriptor.Builder()
+            .name("Ingest Status Polling Timeout")
+            .displayName("Ingest Status Polling Timeout")
+            .description("Defines the value of timeout for polling on 
ingestion status")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString())
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("10 m")
+            .build();
+    public static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = 
new PropertyDescriptor.Builder()
+            .name("Ingest Status Polling Interval")
+            .displayName("Ingest Status Polling Interval")
+            .description("Defines the value of timeout for polling on 
ingestion status in seconds.")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString())
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 s")
+            .build();
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("Success")
+            .description("Relationship For Success")
+            .build();
+    public static final Relationship FAILURE = new Relationship.Builder()
+            .name("Failure")

Review Comment:
   ```suggestion
               .name("failure")
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.data.explorer;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+import java.net.URISyntaxException;
+
+@Tags({"azure", "adx", "explorer", "kusto", "ingest"})
+@CapabilityDescription("Connection-Service for Azure ADX (Kusto) ingestion 
cluster to ingest data.")
+public interface KustoIngestService extends ControllerService {
+    KustoIngestionResult ingestData(KustoIngestionRequest 
kustoIngestionRequest) throws URISyntaxException;

Review Comment:
   The `URISyntaxException` is too specific to the implementation and should be 
removed, it could be wrapped in some other type of `IllegalArgumentException` 
in the implementation if needed.
   ```suggestion
       KustoIngestionResult ingestData(KustoIngestionRequest 
kustoIngestionRequest);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.data.explorer;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+import java.net.URISyntaxException;
+
+@Tags({"azure", "adx", "explorer", "kusto", "ingest"})
+@CapabilityDescription("Connection-Service for Azure ADX (Kusto) ingestion 
cluster to ingest data.")
+public interface KustoIngestService extends ControllerService {
+    KustoIngestionResult ingestData(KustoIngestionRequest 
kustoIngestionRequest) throws URISyntaxException;
+
+    KustoIngestQueryResponse executeQuery(String databaseName, String query);
+
+    KustoIngestQueryResponse checkIfStreamingIsEnabled(String databaseName, 
String query);

Review Comment:
   The method name should be shortened. The `query` parameter should be changed 
to `tableName` and the query itself should be constructed in the service 
implementation, instead of the Processor.
   ```suggestion
       KustoIngestQueryResponse isStreamingEnabled(String databaseName, String 
query);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.data.explorer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestService;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"})
+@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector 
which sends flowFiles using the ADX-Service to the provided Azure Data" +
+        "Explorer Ingest Endpoint. The data can be sent through queued 
ingestion or streaming ingestion to the Azure Data Explorer cluster.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class PutAzureDataExplorer extends AbstractProcessor {
+
+    public static final String FETCH_TABLE_COMMAND = "%s | count";
+    public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s 
policy streamingingestion";
+    public static final String DATABASE = "database";
+    public static final AllowableValue IGNORE_FIRST_RECORD_YES = new 
AllowableValue(
+            "YES", "YES",
+            "Ignore first record during ingestion");
+
+    public static final AllowableValue IGNORE_FIRST_RECORD_NO = new 
AllowableValue(
+            "NO", "NO",
+            "Do not ignore first record during ingestion");
+
+    public static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Database Name")
+            .displayName("Database Name")
+            .description("Azure Data Explorer Database Name for querying")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .displayName("Table Name")
+            .description("Azure Data Explorer Table Name for ingesting data")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MAPPING_NAME = new 
PropertyDescriptor
+            .Builder().name("Ingest Mapping name")
+            .displayName("Ingest Mapping name")
+            .description("The name of the mapping responsible for storing the 
data in the appropriate columns.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IS_STREAMING_ENABLED = new 
PropertyDescriptor
+            .Builder().name("Streaming Enabled")
+            .displayName("Streaming Enabled")
+            .description("This property determines whether we want to stream 
data to ADX.")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new 
PropertyDescriptor
+            .Builder().name("Poll On Ingestion Status")
+            .displayName("Whether To Poll On Ingestion Status")
+            .description("This property determines whether we want to poll on 
ingestion status after an ingestion to ADX is completed")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor
+            .Builder().name("Kusto Ingest Service")
+            .displayName("Kusto Ingest Service")
+            .description("Azure Data Explorer Kusto Ingest Service")
+            .required(true)
+            .identifiesControllerService(KustoIngestService.class)
+            .build();
+    public static final PropertyDescriptor DATA_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Data Format")
+            .displayName("Data Format")
+            .description("The format of the data that is sent to Azure Data 
Explorer.")
+            .required(true)
+            .allowableValues(KustoIngestDataFormat.values())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IGNORE_FIRST_RECORD = new 
PropertyDescriptor.Builder()
+            .name("Ingestion Ignore First Record")
+            .displayName("Ingestion Ignore First Record")
+            .description("Defines whether ignore first record while 
ingestion.")
+            .required(false)
+            .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO)
+            .defaultValue(IGNORE_FIRST_RECORD_NO.getValue())
+            .build();
+    public static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = 
new PropertyDescriptor.Builder()
+            .name("Ingest Status Polling Timeout")
+            .displayName("Ingest Status Polling Timeout")
+            .description("Defines the value of timeout for polling on 
ingestion status")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString())
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("10 m")
+            .build();
+    public static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = 
new PropertyDescriptor.Builder()
+            .name("Ingest Status Polling Interval")
+            .displayName("Ingest Status Polling Interval")
+            .description("Defines the value of timeout for polling on 
ingestion status in seconds.")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString())
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 s")
+            .build();
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("Success")

Review Comment:
   ```suggestion
               .name("success")
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoQueryResponse.java:
##########
@@ -27,16 +28,27 @@ public class KustoQueryResponse {
 
     private final String errorMessage;
 
+    private final List<List<Object>> queryResult;

Review Comment:
   Now that the interface has been redefined, can this list of results be 
removed? The values should be passed in the service implementation methods, 
which would avoid adding this member variable to the public response object.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.data.explorer;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.ClientFactory;
+import com.microsoft.azure.kusto.data.KustoResultColumn;
+import com.microsoft.azure.kusto.data.KustoResultSetTable;
+import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
+import com.microsoft.azure.kusto.data.exceptions.DataClientException;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+import com.microsoft.azure.kusto.ingest.IngestClientFactory;
+import com.microsoft.azure.kusto.ingest.IngestionMapping;
+import com.microsoft.azure.kusto.ingest.IngestionProperties;
+import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient;
+import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
+import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
+import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
+import com.microsoft.azure.kusto.ingest.result.IngestionResult;
+import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
+import com.microsoft.azure.kusto.ingest.result.OperationStatus;
+import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@Tags({"Azure", "ADX", "Kusto", "ingest", "azure"})
+@CapabilityDescription("Sends batches of flowfile content or stream flowfile 
content to an Azure ADX cluster.")
+public class StandardKustoIngestService extends AbstractControllerService 
implements KustoIngestService {
+
+    public static final PropertyDescriptor AUTHENTICATION_STRATEGY = new 
PropertyDescriptor
+            .Builder().name("Authentication Strategy")
+            .displayName("Authentication Strategy")
+            .description("Authentication method for access to Azure Data 
Explorer")
+            .required(true)
+            
.defaultValue(KustoAuthenticationStrategy.MANAGED_IDENTITY.getValue())
+            .allowableValues(KustoAuthenticationStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor APPLICATION_CLIENT_ID = new 
PropertyDescriptor
+            .Builder().name("Application Client ID")
+            .displayName("Application Client ID")
+            .description("Azure Data Explorer Application Client Identifier 
for Authentication")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor APPLICATION_KEY = new 
PropertyDescriptor
+            .Builder().name("Application Key")
+            .displayName("Application Key")
+            .description("Azure Data Explorer Application Key for 
Authentication")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(AUTHENTICATION_STRATEGY, 
KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue())
+            .build();
+
+    public static final PropertyDescriptor APPLICATION_TENANT_ID = new 
PropertyDescriptor.Builder()
+            .name("Application Tenant ID")
+            .displayName("Application Tenant ID")
+            .description("Azure Data Explorer Application Tenant Identifier 
for Authentication")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dependsOn(AUTHENTICATION_STRATEGY, 
KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue())
+            .build();
+
+    public static final PropertyDescriptor CLUSTER_URI = new PropertyDescriptor
+            .Builder().name("Cluster URI")
+            .displayName("Cluster URI")
+            .description("Azure Data Explorer Cluster URI")
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            AUTHENTICATION_STRATEGY,
+            APPLICATION_CLIENT_ID,
+            APPLICATION_KEY,
+            APPLICATION_TENANT_ID,
+            CLUSTER_URI));
+
+    public static final Pair<String, String> NIFI_SINK = Pair.of("processor", 
StandardKustoIngestService.class.getSimpleName());
+
+    private volatile QueuedIngestClient queuedIngestClient;
+
+    private volatile ManagedStreamingIngestClient managedStreamingIngestClient;
+
+    private volatile Client executionClient;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    /**
+     * @param context the configuration context
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
ProcessException, URISyntaxException {
+
+        final String applicationClientId = 
context.getProperty(APPLICATION_CLIENT_ID).getValue();
+        final String applicationKey = 
context.getProperty(APPLICATION_KEY).getValue();
+        final String applicationTenantId = 
context.getProperty(APPLICATION_TENANT_ID).getValue();
+        final String clusterUri = context.getProperty(CLUSTER_URI).getValue();
+        final KustoAuthenticationStrategy kustoAuthenticationStrategy = 
KustoAuthenticationStrategy.valueOf(context.getProperty(AUTHENTICATION_STRATEGY).getValue());
+
+        if (this.queuedIngestClient == null) {
+            this.queuedIngestClient = 
createKustoQueuedIngestClient(clusterUri, applicationClientId, applicationKey, 
applicationTenantId, kustoAuthenticationStrategy);
+        }
+
+        if (this.managedStreamingIngestClient == null) {
+            this.managedStreamingIngestClient = 
createKustoStreamingIngestClient(clusterUri, applicationClientId, 
applicationKey, applicationTenantId, kustoAuthenticationStrategy);
+        }
+
+        if (this.executionClient == null) {
+            this.executionClient = createKustoExecutionClient(clusterUri, 
applicationClientId, applicationKey, applicationTenantId, 
kustoAuthenticationStrategy);
+        }
+
+    }
+
+    @OnStopped
+    public final void onStopped() {
+        if (this.queuedIngestClient != null) {
+            try {
+                this.queuedIngestClient.close();
+            } catch (IOException e) {
+                getLogger().error("Closing Azure ADX Queued Ingest Client 
failed", e);
+            } finally {
+                this.queuedIngestClient = null;
+            }
+        }
+        if (this.managedStreamingIngestClient != null) {
+            try {
+                this.managedStreamingIngestClient.close();
+            } catch (IOException e) {
+                getLogger().error("Closing Azure ADX Managed Streaming Ingest 
Client failed", e);
+            } finally {
+                this.managedStreamingIngestClient = null;
+            }
+        }
+        if (this.executionClient != null) {
+            try {
+                this.executionClient.close();
+            } catch (IOException e) {
+                getLogger().error("Closing Azure ADX Execution Client failed", 
e);
+            } finally {
+                this.executionClient = null;
+            }
+        }
+    }
+
+
+    protected QueuedIngestClient createKustoQueuedIngestClient(final String 
clusterUrl,
+                                                               final String 
appId,
+                                                               final String 
appKey,
+                                                               final String 
appTenant,
+                                                               final 
KustoAuthenticationStrategy kustoAuthStrategy) throws URISyntaxException {
+        ConnectionStringBuilder ingestConnectionStringBuilder = 
createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, 
kustoAuthStrategy);
+        return IngestClientFactory.createClient(ingestConnectionStringBuilder);
+    }
+
+    protected ManagedStreamingIngestClient 
createKustoStreamingIngestClient(final String clusterUrl,
+                                                                            
final String appId,
+                                                                            
final String appKey,
+                                                                            
final String appTenant,
+                                                                            
final KustoAuthenticationStrategy kustoAuthStrategy) throws URISyntaxException {
+        ConnectionStringBuilder ingestConnectionStringBuilder = 
createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, 
kustoAuthStrategy);
+        ConnectionStringBuilder streamingConnectionStringBuilder = 
createKustoEngineConnectionString(clusterUrl, appId, appKey, appTenant, 
kustoAuthStrategy);
+        return 
IngestClientFactory.createManagedStreamingIngestClient(ingestConnectionStringBuilder,
 streamingConnectionStringBuilder);
+    }
+
+    public KustoIngestionResult ingestData(KustoIngestionRequest 
kustoIngestionRequest) throws URISyntaxException {
+        StreamSourceInfo info = new 
StreamSourceInfo(kustoIngestionRequest.getInputStream());
+        //ingest data
+        IngestionResult ingestionResult;
+        IngestionProperties ingestionProperties = new 
IngestionProperties(kustoIngestionRequest.getDatabaseName(),
+                kustoIngestionRequest.getTableName());
+
+        IngestionMapping.IngestionMappingKind ingestionMappingKind = 
setDataFormatAndMapping(kustoIngestionRequest.getDataFormat(), 
ingestionProperties);
+        if (StringUtils.isNotEmpty(kustoIngestionRequest.getMappingName()) && 
ingestionMappingKind != null) {
+            
ingestionProperties.setIngestionMapping(kustoIngestionRequest.getMappingName(), 
ingestionMappingKind);
+        }
+
+        
ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
+        
ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
+        ingestionProperties.setFlushImmediately(false);
+        
ingestionProperties.setIgnoreFirstRecord(StringUtils.equalsIgnoreCase(kustoIngestionRequest.getIgnoreFirstRecord(),
 "YES"));
+
+        try {
+            if (kustoIngestionRequest.isStreamingEnabled()) {
+                ingestionResult = 
managedStreamingIngestClient.ingestFromStream(info, ingestionProperties);
+            } else {
+                ingestionResult = queuedIngestClient.ingestFromStream(info, 
ingestionProperties);
+            }
+            if (kustoIngestionRequest.pollOnIngestionStatus()) {
+                List<IngestionStatus> statuses = 
initializeKustoIngestionStatusAsPending();
+                long startTime = System.currentTimeMillis();
+                long timeoutMillis = 
kustoIngestionRequest.getIngestionStatusPollingTimeout().toMillis();
+                // Calculate the end time based on the timeout duration
+                long endTime = startTime + timeoutMillis;
+                while (System.currentTimeMillis() < endTime) {
+                    // Get the status of the ingestion operation
+                    List<IngestionStatus> statuses1 = 
ingestionResult.getIngestionStatusCollection();
+                    if (statuses1.get(0).status == OperationStatus.Succeeded
+                            || statuses1.get(0).status == 
OperationStatus.Failed
+                            || statuses1.get(0).status == 
OperationStatus.PartiallySucceeded) {
+                        statuses = statuses1;
+                        break;
+                    }
+                    // Sleep for before checking again
+                    
Thread.sleep(kustoIngestionRequest.getIngestionStatusPollingInterval().toMillis());
+                }
+                // Check if the timeout has been exceeded
+                if (System.currentTimeMillis() - startTime >= timeoutMillis) {
+                    throw new ProcessException(String.format("Timeout of %s 
exceeded while waiting for ingestion status", 
kustoIngestionRequest.getIngestionStatusPollingTimeout()));
+                }
+                return 
KustoIngestionResult.fromString(statuses.get(0).status.toString());

Review Comment:
   Recommend moving this block into a separate method named something like 
`pollIngestionStatus()`.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.data.explorer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestService;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"})
+@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector 
which sends flowFiles using the ADX-Service to the provided Azure Data" +
+        "Explorer Ingest Endpoint. The data can be sent through queued 
ingestion or streaming ingestion to the Azure Data Explorer cluster.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class PutAzureDataExplorer extends AbstractProcessor {
+
+    public static final String FETCH_TABLE_COMMAND = "%s | count";
+    public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s 
policy streamingingestion";
+    public static final String DATABASE = "database";
+    public static final AllowableValue IGNORE_FIRST_RECORD_YES = new 
AllowableValue(
+            "YES", "YES",
+            "Ignore first record during ingestion");
+
+    public static final AllowableValue IGNORE_FIRST_RECORD_NO = new 
AllowableValue(
+            "NO", "NO",
+            "Do not ignore first record during ingestion");
+
+    public static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Database Name")
+            .displayName("Database Name")
+            .description("Azure Data Explorer Database Name for querying")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .displayName("Table Name")
+            .description("Azure Data Explorer Table Name for ingesting data")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MAPPING_NAME = new 
PropertyDescriptor
+            .Builder().name("Ingest Mapping name")
+            .displayName("Ingest Mapping name")
+            .description("The name of the mapping responsible for storing the 
data in the appropriate columns.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IS_STREAMING_ENABLED = new 
PropertyDescriptor
+            .Builder().name("Streaming Enabled")
+            .displayName("Streaming Enabled")
+            .description("This property determines whether we want to stream 
data to ADX.")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new 
PropertyDescriptor
+            .Builder().name("Poll On Ingestion Status")
+            .displayName("Whether To Poll On Ingestion Status")
+            .description("This property determines whether we want to poll on 
ingestion status after an ingestion to ADX is completed")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor
+            .Builder().name("Kusto Ingest Service")
+            .displayName("Kusto Ingest Service")
+            .description("Azure Data Explorer Kusto Ingest Service")
+            .required(true)
+            .identifiesControllerService(KustoIngestService.class)
+            .build();
+    public static final PropertyDescriptor DATA_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Data Format")
+            .displayName("Data Format")
+            .description("The format of the data that is sent to Azure Data 
Explorer.")
+            .required(true)
+            .allowableValues(KustoIngestDataFormat.values())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IGNORE_FIRST_RECORD = new 
PropertyDescriptor.Builder()
+            .name("Ingestion Ignore First Record")
+            .displayName("Ingestion Ignore First Record")
+            .description("Defines whether ignore first record while 
ingestion.")
+            .required(false)
+            .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO)
+            .defaultValue(IGNORE_FIRST_RECORD_NO.getValue())
+            .build();
+    public static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = 
new PropertyDescriptor.Builder()
+            .name("Ingest Status Polling Timeout")
+            .displayName("Ingest Status Polling Timeout")
+            .description("Defines the value of timeout for polling on 
ingestion status")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString())
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("10 m")
+            .build();
+    public static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = 
new PropertyDescriptor.Builder()
+            .name("Ingest Status Polling Interval")
+            .displayName("Ingest Status Polling Interval")
+            .description("Defines the value of timeout for polling on 
ingestion status in seconds.")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString())
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 s")
+            .build();
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("Success")
+            .description("Relationship For Success")
+            .build();
+    public static final Relationship FAILURE = new Relationship.Builder()
+            .name("Failure")
+            .description("Relationship For Failure")
+            .build();
+
+    public static final PropertyDescriptor 
ROUTE_PARTIALLY_SUCCESSFUL_INGESTION = new PropertyDescriptor.Builder()
+            .name("Route Partially Successful Ingestion Records")
+            .displayName("Route Partially Successful Ingestion Records")
+            .description("Defines where to route partially successful 
ingestion records.")
+            .required(false)
+            .allowableValues(SUCCESS.getName(), FAILURE.getName())
+            .defaultValue(FAILURE.getName())
+            .build();
+    private static final List<PropertyDescriptor> descriptors;
+    private static final Set<Relationship> relationships;
+    private transient KustoIngestService service;
+    private boolean streamingEnabled;
+    private boolean pollOnIngestionStatus;

Review Comment:
   These values should not be member variables.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestService.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.data.explorer;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+import java.net.URISyntaxException;
+
+@Tags({"azure", "adx", "explorer", "kusto", "ingest"})
+@CapabilityDescription("Connection-Service for Azure ADX (Kusto) ingestion 
cluster to ingest data.")
+public interface KustoIngestService extends ControllerService {
+    KustoIngestionResult ingestData(KustoIngestionRequest 
kustoIngestionRequest) throws URISyntaxException;
+
+    KustoIngestQueryResponse executeQuery(String databaseName, String query);
+
+    KustoIngestQueryResponse checkIfStreamingIsEnabled(String databaseName, 
String query);
+
+    KustoIngestQueryResponse checkIfIngestorPrivilegeIsEnabled(String 
databaseName, String query);

Review Comment:
   ```suggestion
       KustoIngestQueryResponse isIngestorPrivilegeEnabled(String databaseName, 
String tableName);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.data.explorer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestService;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"})
+@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector 
which sends flowFiles using the ADX-Service to the provided Azure Data" +
+        "Explorer Ingest Endpoint. The data can be sent through queued 
ingestion or streaming ingestion to the Azure Data Explorer cluster.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class PutAzureDataExplorer extends AbstractProcessor {
+
+    public static final String FETCH_TABLE_COMMAND = "%s | count";
+    public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s 
policy streamingingestion";
+    public static final String DATABASE = "database";
+    public static final AllowableValue IGNORE_FIRST_RECORD_YES = new 
AllowableValue(
+            "YES", "YES",
+            "Ignore first record during ingestion");
+
+    public static final AllowableValue IGNORE_FIRST_RECORD_NO = new 
AllowableValue(
+            "NO", "NO",
+            "Do not ignore first record during ingestion");
+
+    public static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Database Name")
+            .displayName("Database Name")
+            .description("Azure Data Explorer Database Name for querying")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .displayName("Table Name")
+            .description("Azure Data Explorer Table Name for ingesting data")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MAPPING_NAME = new 
PropertyDescriptor
+            .Builder().name("Ingest Mapping name")
+            .displayName("Ingest Mapping name")
+            .description("The name of the mapping responsible for storing the 
data in the appropriate columns.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IS_STREAMING_ENABLED = new 
PropertyDescriptor
+            .Builder().name("Streaming Enabled")
+            .displayName("Streaming Enabled")
+            .description("This property determines whether we want to stream 
data to ADX.")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new 
PropertyDescriptor
+            .Builder().name("Poll On Ingestion Status")
+            .displayName("Whether To Poll On Ingestion Status")

Review Comment:
   ```suggestion
               .Builder().name("Poll for Ingestion Status")
               .displayName("Poll for Ingestion Status")
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.data.explorer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestService;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"})
+@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector 
which sends flowFiles using the ADX-Service to the provided Azure Data" +
+        "Explorer Ingest Endpoint. The data can be sent through queued 
ingestion or streaming ingestion to the Azure Data Explorer cluster.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class PutAzureDataExplorer extends AbstractProcessor {
+
+    public static final String FETCH_TABLE_COMMAND = "%s | count";
+    public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s 
policy streamingingestion";
+    public static final String DATABASE = "database";
+    public static final AllowableValue IGNORE_FIRST_RECORD_YES = new 
AllowableValue(
+            "YES", "YES",
+            "Ignore first record during ingestion");
+
+    public static final AllowableValue IGNORE_FIRST_RECORD_NO = new 
AllowableValue(
+            "NO", "NO",
+            "Do not ignore first record during ingestion");
+
+    public static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Database Name")
+            .displayName("Database Name")
+            .description("Azure Data Explorer Database Name for querying")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .displayName("Table Name")
+            .description("Azure Data Explorer Table Name for ingesting data")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MAPPING_NAME = new 
PropertyDescriptor
+            .Builder().name("Ingest Mapping name")
+            .displayName("Ingest Mapping name")
+            .description("The name of the mapping responsible for storing the 
data in the appropriate columns.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IS_STREAMING_ENABLED = new 
PropertyDescriptor
+            .Builder().name("Streaming Enabled")
+            .displayName("Streaming Enabled")
+            .description("This property determines whether we want to stream 
data to ADX.")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new 
PropertyDescriptor
+            .Builder().name("Poll On Ingestion Status")
+            .displayName("Whether To Poll On Ingestion Status")
+            .description("This property determines whether we want to poll on 
ingestion status after an ingestion to ADX is completed")

Review Comment:
   ```suggestion
               .description("Determines whether to poll on ingestion status 
after an ingestion to Azure Data Explorer is completed")
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/PutAzureDataExplorer.java:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.data.explorer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestDataFormat;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestQueryResponse;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestService;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionRequest;
+import org.apache.nifi.services.azure.data.explorer.KustoIngestionResult;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"Azure", "Kusto", "ADX", "Explorer", "Data"})
+@CapabilityDescription("The PutAzureDataExplorer acts as a ADX sink connector 
which sends flowFiles using the ADX-Service to the provided Azure Data" +
+        "Explorer Ingest Endpoint. The data can be sent through queued 
ingestion or streaming ingestion to the Azure Data Explorer cluster.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class PutAzureDataExplorer extends AbstractProcessor {
+
+    public static final String FETCH_TABLE_COMMAND = "%s | count";
+    public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s 
policy streamingingestion";
+    public static final String DATABASE = "database";
+    public static final AllowableValue IGNORE_FIRST_RECORD_YES = new 
AllowableValue(
+            "YES", "YES",
+            "Ignore first record during ingestion");
+
+    public static final AllowableValue IGNORE_FIRST_RECORD_NO = new 
AllowableValue(
+            "NO", "NO",
+            "Do not ignore first record during ingestion");
+
+    public static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Database Name")
+            .displayName("Database Name")
+            .description("Azure Data Explorer Database Name for querying")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .displayName("Table Name")
+            .description("Azure Data Explorer Table Name for ingesting data")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor MAPPING_NAME = new 
PropertyDescriptor
+            .Builder().name("Ingest Mapping name")
+            .displayName("Ingest Mapping name")
+            .description("The name of the mapping responsible for storing the 
data in the appropriate columns.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IS_STREAMING_ENABLED = new 
PropertyDescriptor
+            .Builder().name("Streaming Enabled")
+            .displayName("Streaming Enabled")
+            .description("This property determines whether we want to stream 
data to ADX.")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor POLL_ON_INGESTION_STATUS = new 
PropertyDescriptor
+            .Builder().name("Poll On Ingestion Status")
+            .displayName("Whether To Poll On Ingestion Status")
+            .description("This property determines whether we want to poll on 
ingestion status after an ingestion to ADX is completed")
+            .required(false)
+            .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(Boolean.FALSE.toString())
+            .build();
+    public static final PropertyDescriptor ADX_SERVICE = new PropertyDescriptor
+            .Builder().name("Kusto Ingest Service")
+            .displayName("Kusto Ingest Service")
+            .description("Azure Data Explorer Kusto Ingest Service")
+            .required(true)
+            .identifiesControllerService(KustoIngestService.class)
+            .build();
+    public static final PropertyDescriptor DATA_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Data Format")
+            .displayName("Data Format")
+            .description("The format of the data that is sent to Azure Data 
Explorer.")
+            .required(true)
+            .allowableValues(KustoIngestDataFormat.values())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IGNORE_FIRST_RECORD = new 
PropertyDescriptor.Builder()
+            .name("Ingestion Ignore First Record")
+            .displayName("Ingestion Ignore First Record")
+            .description("Defines whether ignore first record while 
ingestion.")
+            .required(false)
+            .allowableValues(IGNORE_FIRST_RECORD_YES, IGNORE_FIRST_RECORD_NO)
+            .defaultValue(IGNORE_FIRST_RECORD_NO.getValue())
+            .build();
+    public static final PropertyDescriptor INGESTION_STATUS_POLLING_TIMEOUT = 
new PropertyDescriptor.Builder()
+            .name("Ingest Status Polling Timeout")
+            .displayName("Ingest Status Polling Timeout")
+            .description("Defines the value of timeout for polling on 
ingestion status")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString())
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("10 m")
+            .build();
+    public static final PropertyDescriptor INGESTION_STATUS_POLLING_INTERVAL = 
new PropertyDescriptor.Builder()
+            .name("Ingest Status Polling Interval")
+            .displayName("Ingest Status Polling Interval")
+            .description("Defines the value of timeout for polling on 
ingestion status in seconds.")
+            .required(false)
+            .dependsOn(POLL_ON_INGESTION_STATUS, Boolean.TRUE.toString())
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 s")
+            .build();
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("Success")
+            .description("Relationship For Success")
+            .build();
+    public static final Relationship FAILURE = new Relationship.Builder()
+            .name("Failure")
+            .description("Relationship For Failure")
+            .build();
+
+    public static final PropertyDescriptor 
ROUTE_PARTIALLY_SUCCESSFUL_INGESTION = new PropertyDescriptor.Builder()
+            .name("Route Partially Successful Ingestion Records")
+            .displayName("Route Partially Successful Ingestion Records")
+            .description("Defines where to route partially successful 
ingestion records.")
+            .required(false)
+            .allowableValues(SUCCESS.getName(), FAILURE.getName())
+            .defaultValue(FAILURE.getName())
+            .build();
+    private static final List<PropertyDescriptor> descriptors;
+    private static final Set<Relationship> relationships;
+    private transient KustoIngestService service;
+    private boolean streamingEnabled;
+    private boolean pollOnIngestionStatus;
+
+    static  {
+        final List<PropertyDescriptor> descriptorList = new ArrayList<>();
+        descriptorList.add(ADX_SERVICE);
+        descriptorList.add(DATABASE_NAME);
+        descriptorList.add(TABLE_NAME);
+        descriptorList.add(MAPPING_NAME);
+        descriptorList.add(DATA_FORMAT);
+        descriptorList.add(IGNORE_FIRST_RECORD);
+        descriptorList.add(IS_STREAMING_ENABLED);
+        descriptorList.add(POLL_ON_INGESTION_STATUS);
+        descriptorList.add(ROUTE_PARTIALLY_SUCCESSFUL_INGESTION);
+        descriptorList.add(INGESTION_STATUS_POLLING_TIMEOUT);
+        descriptorList.add(INGESTION_STATUS_POLLING_INTERVAL);
+        descriptors = Collections.unmodifiableList(descriptorList);

Review Comment:
   This should be changed to use List.of()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to