This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c42ebef410 NIFI-14825 : Fix isStreaming, add tests and changes for 
v7.0.2 that break
c42ebef410 is described below

commit c42ebef410d3dfefaf1a0439843ddca7df08961a
Author: ag-ramachandran <[email protected]>
AuthorDate: Fri Aug 8 20:17:48 2025 +0530

    NIFI-14825 : Fix isStreaming, add tests and changes for v7.0.2 that break
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10182.
---
 .../data/explorer/KustoAuthenticationStrategy.java |   5 +-
 .../data/explorer/KustoIngestQueryResponse.java    |   2 +
 .../data/explorer/StandardKustoIngestService.java  |  37 +++--
 .../data/explorer/StandardKustoQueryService.java   |   1 +
 .../explorer/StandardKustoIngestServiceTest.java   | 153 +++++++++++++++++++++
 5 files changed, 178 insertions(+), 20 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java
index 5182bbef1a..99bd6acdb8 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java
@@ -20,7 +20,10 @@ import org.apache.nifi.components.DescribedValue;
 
 public enum KustoAuthenticationStrategy implements DescribedValue {
     APPLICATION_CREDENTIALS("Application Credentials", "Azure Application 
Registration with Application Key"),
-    MANAGED_IDENTITY("Managed Identity", "Azure Managed Identity");
+    MANAGED_IDENTITY("Managed Identity", "Azure Managed Identity"),
+    // This is required for tests going forward. This is not a breaking 
change, but permits developers to test locally without having to use a
+    // machine with Service Principal credentials or Managed Identity.
+    AZ_CLI_DEV_ONLY("Azure CLI (Dev Only)", "Azure CLI authentication, 
suitable for development purposes only");
 
     private final String displayName;
 
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestQueryResponse.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestQueryResponse.java
index 948f182f48..a42871ce9b 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestQueryResponse.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestQueryResponse.java
@@ -45,5 +45,7 @@ public class KustoIngestQueryResponse {
 
     public KustoIngestQueryResponse(final boolean error) {
         this.error = error;
+        // Initialize queryResult to an empty map to avoid null checks later
+        this.queryResult = Collections.emptyMap();
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
index 4958cae5ad..cfe6ff9944 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
@@ -50,6 +50,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -113,9 +114,10 @@ public class StandardKustoIngestService extends 
AbstractControllerService implem
             CLUSTER_URI
     );
 
-    private static final String STREAMING_POLICY_SHOW_COMMAND = ".show 
database %s policy streamingingestion";
+    // Query changed as the server can return no result (no policy) or 
explicitly enabled / disabled scenarios
+    private static final String STREAMING_POLICY_SHOW_COMMAND = ".show 
database %s policy streamingingestion |  project IsEnabled = 
todynamic(Policy)['IsEnabled']";
 
-    private static final String COUNT_TABLE_COMMAND = "%s | count";
+    private static final String COUNT_TABLE_COMMAND = "%s | count"; // This 
will need to be optimized. Large tables may fail on this.
 
     private static final Map<String, String> NIFI_SINK = Map.of("processor", 
StandardKustoIngestService.class.getSimpleName());
 
@@ -123,7 +125,7 @@ public class StandardKustoIngestService extends 
AbstractControllerService implem
 
     private volatile ManagedStreamingIngestClient managedStreamingIngestClient;
 
-    private volatile Client executionClient;
+    protected volatile Client executionClient;
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -256,18 +258,9 @@ public class StandardKustoIngestService extends 
AbstractControllerService implem
     public boolean isStreamingPolicyEnabled(final String databaseName) {
         final String query = String.format(STREAMING_POLICY_SHOW_COMMAND, 
databaseName);
         final KustoIngestQueryResponse kustoIngestQueryResponse = 
executeQuery(databaseName, query);
-
-        boolean streamingPolicyEnabled = false;
-        if (!kustoIngestQueryResponse.getQueryResult().isEmpty()) {
-            final List<String> row = 
kustoIngestQueryResponse.getQueryResult().get(0);
-            if (!row.isEmpty()) {
-                final String streamingPolicy = row.get(2);
-                if (!streamingPolicy.isEmpty()) {
-                    streamingPolicyEnabled = true;
-                }
-            }
-        }
-        return streamingPolicyEnabled;
+        return kustoIngestQueryResponse.getQueryResult().values().stream()
+                .flatMap(Collection::stream)
+                .anyMatch(val -> "true".equalsIgnoreCase(val));
     }
 
     @Override
@@ -299,10 +292,14 @@ public class StandardKustoIngestService extends 
AbstractControllerService implem
         final ConnectionStringBuilder builder;
         if (KustoAuthenticationStrategy.APPLICATION_CREDENTIALS == 
kustoAuthStrategy) {
             builder = 
ConnectionStringBuilder.createWithAadApplicationCredentials(clusterUrl, appId, 
appKey, appTenant);
-        } else {
+        } else if (KustoAuthenticationStrategy.MANAGED_IDENTITY == 
kustoAuthStrategy) {
             builder = 
ConnectionStringBuilder.createWithAadManagedIdentity(clusterUrl, appId);
+        } else if (KustoAuthenticationStrategy.AZ_CLI_DEV_ONLY == 
kustoAuthStrategy) {
+            // The new SDK version supports Azure CLI authentication
+            builder = ConnectionStringBuilder.createWithAzureCli(clusterUrl);
+        } else {
+            throw new IllegalArgumentException("Unsupported Kusto 
Authentication Strategy: " + kustoAuthStrategy);
         }
-
         builder.setConnectorDetails("Kusto.Nifi.Sink", 
StandardKustoIngestService.class.getPackage().getImplementationVersion(), null, 
null, false, null, NIFI_SINK);
         return builder;
     }
@@ -313,7 +310,9 @@ public class StandardKustoIngestService extends 
AbstractControllerService implem
 
         KustoIngestQueryResponse kustoIngestQueryResponse;
         try {
-            final KustoOperationResult kustoOperationResult = 
this.executionClient.executeQuery(databaseName, query);
+            // Requires a change in the new SDK version. This will fail with 
executeQuery v7.0.0 and up of the SDK
+            boolean isMgmtCommand = query != null && query.startsWith(".");
+            final KustoOperationResult kustoOperationResult = isMgmtCommand ? 
this.executionClient.executeMgmt(databaseName, query) : 
this.executionClient.executeQuery(databaseName, query);
             final KustoResultSetTable kustoResultSetTable = 
kustoOperationResult.getPrimaryResults();
 
             final Map<Integer, List<String>> response = new HashMap<>();
@@ -331,7 +330,7 @@ public class StandardKustoIngestService extends 
AbstractControllerService implem
             }
             kustoIngestQueryResponse = new KustoIngestQueryResponse(response);
         } catch (final DataServiceException | DataClientException e) {
-            getLogger().error("Azure Data Explorer Ingest execution failed", 
e);
+            getLogger().error("Azure Data Explorer Ingest execution failed 
executing query {} on database {}", query, databaseName, e);
             kustoIngestQueryResponse = new KustoIngestQueryResponse(true);
         }
         return kustoIngestQueryResponse;
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java
index b4a9b85812..9788906bbf 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java
@@ -150,6 +150,7 @@ public class StandardKustoQueryService extends 
AbstractControllerService impleme
                 yield 
ConnectionStringBuilder.createWithAadApplicationCredentials(clusterUrl, 
clientId, applicationKey, tenantId);
             }
             case MANAGED_IDENTITY -> 
ConnectionStringBuilder.createWithAadManagedIdentity(clusterUrl, clientId);
+            case AZ_CLI_DEV_ONLY -> 
ConnectionStringBuilder.createWithAzureCli(clusterUrl);
         };
 
         builder.setConnectorDetails("Kusto.Nifi.Source", 
StandardKustoQueryService.class.getPackage().getImplementationVersion(), null, 
null, false, null, NIFI_SOURCE);
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestServiceTest.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestServiceTest.java
new file mode 100644
index 0000000000..457612ce22
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestServiceTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.data.explorer;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doNothing;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.KustoResultColumn;
+import com.microsoft.azure.kusto.data.KustoResultSetTable;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+
+@ExtendWith(MockitoExtension.class)
+public class StandardKustoIngestServiceTest {
+
+    private Client mockExecutionClient;
+    private KustoOperationResult mockKustoOperationResult;
+    private KustoResultSetTable mockKustoResultSetTable;
+    private ComponentLog mockLogger;
+    private StandardKustoIngestService service;
+
+    private static class TestableKustoIngestService extends 
StandardKustoIngestService {
+        private final ComponentLog logger;
+        TestableKustoIngestService(ComponentLog logger, Client mockClient) {
+            this.logger = logger;
+            this.executionClient = mockClient;
+        }
+        @Override
+        protected ComponentLog getLogger() {
+            return logger;
+        }
+    }
+
+    @BeforeEach
+    public void setUp() {
+        mockLogger = mock(ComponentLog.class);
+        mockExecutionClient = mock(Client.class);
+        mockKustoOperationResult = mock(KustoOperationResult.class);
+        mockKustoResultSetTable = mock(KustoResultSetTable.class);
+        service = new TestableKustoIngestService(mockLogger, 
mockExecutionClient);
+    }
+
+    @ParameterizedTest
+    @CsvSource({
+        "true",
+        "false",
+        "null" //needs to be a constant
+    })
+    void testIsStreamingPolicyEnabled(String isStreamingEnabled) throws 
Exception {
+        KustoResultColumn[] krcArray = Map.of(0, "PolicyName", 1, 
"EntityName", 2, "Policy", 3, "ChildEntities", 4, "EntityType")
+            .entrySet().stream().map(entry -> new 
KustoResultColumn(entry.getValue(), "string", entry.getKey()))
+            .toArray(KustoResultColumn[]::new);
+
+        // Arrange: mock the KustoOperationResult and KustoResultSetTable
+        // Simulate a table with a row indicating streaming policy is enabled
+        
when(mockKustoResultSetTable.hasNext()).thenReturn(true).thenReturn(false);
+        
when(mockKustoResultSetTable.next()).thenReturn(true).thenReturn(false);
+        
when(mockKustoResultSetTable.getString(anyInt())).thenReturn("null".equals(isStreamingEnabled)
 ? null : isStreamingEnabled);
+        when(mockKustoResultSetTable.getColumns()).thenReturn(krcArray);
+
+        
when(mockKustoOperationResult.getPrimaryResults()).thenReturn(mockKustoResultSetTable);
+
+        // Arrange: mock executeMgmt to return a Results with a 
KustoResultSetTable
+        when(mockExecutionClient.executeMgmt(eq("db"), eq(".show database db 
policy streamingingestion |  project IsEnabled = 
todynamic(Policy)['IsEnabled']")))
+            .thenReturn(mockKustoOperationResult);
+
+        // Act
+        boolean actual = service.isStreamingPolicyEnabled("db");
+
+        // Assert
+        boolean expected = "true".equals(isStreamingEnabled);
+        assertEquals(expected, actual, "Streaming policy enabled check failed 
for value: " + isStreamingEnabled);
+    }
+
+    @ParameterizedTest
+    @CsvSource({
+        "100",
+        "-1"
+    })
+    void testIsTableReadable(String rowCount) {
+        String databaseName = "db";
+        String tableName = "table";
+
+        KustoResultColumn[] krcArray = Map.of(0, "Count")
+            .entrySet().stream().map(entry -> new 
KustoResultColumn(entry.getValue(), "int64", entry.getKey()))
+            .toArray(KustoResultColumn[]::new);
+
+        // Arrange: mock the KustoOperationResult and KustoResultSetTable
+        // Simulate a table with a row indicating streaming policy is enabled
+        
when(mockKustoResultSetTable.hasNext()).thenReturn(true).thenReturn(false);
+        
when(mockKustoResultSetTable.next()).thenReturn(true).thenReturn(false);
+
+        if ("-1".equals(rowCount)) {
+            // Simulate a table that is not readable, e.g., no rows or columns
+            when(mockKustoResultSetTable.getString(anyInt())).thenThrow(new 
DataServiceException("Engine", "Table is not readable", true));
+            doNothing().when(mockLogger).error(anyString(), anyString(), 
anyString(), Mockito.any(Throwable.class));
+        } else {
+            
when(mockKustoResultSetTable.getString(anyInt())).thenReturn(rowCount);
+        }
+
+        when(mockKustoResultSetTable.getColumns()).thenReturn(krcArray);
+
+        
when(mockKustoOperationResult.getPrimaryResults()).thenReturn(mockKustoResultSetTable);
+
+        // Arrange: mock executeMgmt to return a Results with a 
KustoResultSetTable
+        when(mockExecutionClient.executeQuery(eq(databaseName), 
eq(String.format("%s | count", tableName))))
+            .thenReturn(mockKustoOperationResult);
+
+        // Act
+        boolean actual = service.isTableReadable(databaseName, tableName);
+
+        // Assert
+        if ("-1".equals(rowCount)) {
+            // Simulate a table that is not readable, e.g., no rows or columns
+            assertFalse(actual, "Table should not be readable for row count: " 
+ rowCount + " where an exception is thrown");
+        } else {
+            assertTrue(actual, "Table should be readable for row count: " + 
rowCount);
+        }
+    }
+}
\ No newline at end of file

Reply via email to