This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c42ebef410 NIFI-14825 : Fix isStreaming, add tests and changes for
v7.0.2 that break
c42ebef410 is described below
commit c42ebef410d3dfefaf1a0439843ddca7df08961a
Author: ag-ramachandran <[email protected]>
AuthorDate: Fri Aug 8 20:17:48 2025 +0530
NIFI-14825 : Fix isStreaming, add tests and changes for v7.0.2 that break
Signed-off-by: Pierre Villard <[email protected]>
This closes #10182.
---
.../data/explorer/KustoAuthenticationStrategy.java | 5 +-
.../data/explorer/KustoIngestQueryResponse.java | 2 +
.../data/explorer/StandardKustoIngestService.java | 37 +++--
.../data/explorer/StandardKustoQueryService.java | 1 +
.../explorer/StandardKustoIngestServiceTest.java | 153 +++++++++++++++++++++
5 files changed, 178 insertions(+), 20 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java
index 5182bbef1a..99bd6acdb8 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java
@@ -20,7 +20,10 @@ import org.apache.nifi.components.DescribedValue;
public enum KustoAuthenticationStrategy implements DescribedValue {
APPLICATION_CREDENTIALS("Application Credentials", "Azure Application
Registration with Application Key"),
- MANAGED_IDENTITY("Managed Identity", "Azure Managed Identity");
+ MANAGED_IDENTITY("Managed Identity", "Azure Managed Identity"),
+ // This is required for tests going forward. This is not a breaking
change, but permits developers to test locally without having to use a
+ // machine with Service Principal credentials or Managed Identity.
+ AZ_CLI_DEV_ONLY("Azure CLI (Dev Only)", "Azure CLI authentication,
suitable for development purposes only");
private final String displayName;
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestQueryResponse.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestQueryResponse.java
index 948f182f48..a42871ce9b 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestQueryResponse.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoIngestQueryResponse.java
@@ -45,5 +45,7 @@ public class KustoIngestQueryResponse {
public KustoIngestQueryResponse(final boolean error) {
this.error = error;
+ // Initialize queryResult to an empty map to avoid null checks later
+ this.queryResult = Collections.emptyMap();
}
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
index 4958cae5ad..cfe6ff9944 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestService.java
@@ -50,6 +50,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -113,9 +114,10 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
CLUSTER_URI
);
- private static final String STREAMING_POLICY_SHOW_COMMAND = ".show
database %s policy streamingingestion";
+ // Query changed as the server can return no result (no policy) or
explicitly enabled / disabled scenarios
+ private static final String STREAMING_POLICY_SHOW_COMMAND = ".show
database %s policy streamingingestion | project IsEnabled =
todynamic(Policy)['IsEnabled']";
- private static final String COUNT_TABLE_COMMAND = "%s | count";
+ private static final String COUNT_TABLE_COMMAND = "%s | count"; // This
will need to be optimized. Large tables may fail on this.
private static final Map<String, String> NIFI_SINK = Map.of("processor",
StandardKustoIngestService.class.getSimpleName());
@@ -123,7 +125,7 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
private volatile ManagedStreamingIngestClient managedStreamingIngestClient;
- private volatile Client executionClient;
+ protected volatile Client executionClient;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -256,18 +258,9 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
public boolean isStreamingPolicyEnabled(final String databaseName) {
final String query = String.format(STREAMING_POLICY_SHOW_COMMAND,
databaseName);
final KustoIngestQueryResponse kustoIngestQueryResponse =
executeQuery(databaseName, query);
-
- boolean streamingPolicyEnabled = false;
- if (!kustoIngestQueryResponse.getQueryResult().isEmpty()) {
- final List<String> row =
kustoIngestQueryResponse.getQueryResult().get(0);
- if (!row.isEmpty()) {
- final String streamingPolicy = row.get(2);
- if (!streamingPolicy.isEmpty()) {
- streamingPolicyEnabled = true;
- }
- }
- }
- return streamingPolicyEnabled;
+ return kustoIngestQueryResponse.getQueryResult().values().stream()
+ .flatMap(Collection::stream)
+ .anyMatch(val -> "true".equalsIgnoreCase(val));
}
@Override
@@ -299,10 +292,14 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
final ConnectionStringBuilder builder;
if (KustoAuthenticationStrategy.APPLICATION_CREDENTIALS ==
kustoAuthStrategy) {
builder =
ConnectionStringBuilder.createWithAadApplicationCredentials(clusterUrl, appId,
appKey, appTenant);
- } else {
+ } else if (KustoAuthenticationStrategy.MANAGED_IDENTITY ==
kustoAuthStrategy) {
builder =
ConnectionStringBuilder.createWithAadManagedIdentity(clusterUrl, appId);
+ } else if (KustoAuthenticationStrategy.AZ_CLI_DEV_ONLY ==
kustoAuthStrategy) {
+ // The new SDK version supports Azure CLI authentication
+ builder = ConnectionStringBuilder.createWithAzureCli(clusterUrl);
+ } else {
+ throw new IllegalArgumentException("Unsupported Kusto
Authentication Strategy: " + kustoAuthStrategy);
}
-
builder.setConnectorDetails("Kusto.Nifi.Sink",
StandardKustoIngestService.class.getPackage().getImplementationVersion(), null,
null, false, null, NIFI_SINK);
return builder;
}
@@ -313,7 +310,9 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
KustoIngestQueryResponse kustoIngestQueryResponse;
try {
- final KustoOperationResult kustoOperationResult =
this.executionClient.executeQuery(databaseName, query);
+ // Requires a change in the new SDK version. This will fail with
executeQuery v7.0.0 and up of the SDK
+ boolean isMgmtCommand = query != null && query.startsWith(".");
+ final KustoOperationResult kustoOperationResult = isMgmtCommand ?
this.executionClient.executeMgmt(databaseName, query) :
this.executionClient.executeQuery(databaseName, query);
final KustoResultSetTable kustoResultSetTable =
kustoOperationResult.getPrimaryResults();
final Map<Integer, List<String>> response = new HashMap<>();
@@ -331,7 +330,7 @@ public class StandardKustoIngestService extends
AbstractControllerService implem
}
kustoIngestQueryResponse = new KustoIngestQueryResponse(response);
} catch (final DataServiceException | DataClientException e) {
- getLogger().error("Azure Data Explorer Ingest execution failed",
e);
+ getLogger().error("Azure Data Explorer Ingest execution failed
executing query {} on database {}", query, databaseName, e);
kustoIngestQueryResponse = new KustoIngestQueryResponse(true);
}
return kustoIngestQueryResponse;
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java
index b4a9b85812..9788906bbf 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java
@@ -150,6 +150,7 @@ public class StandardKustoQueryService extends
AbstractControllerService impleme
yield
ConnectionStringBuilder.createWithAadApplicationCredentials(clusterUrl,
clientId, applicationKey, tenantId);
}
case MANAGED_IDENTITY ->
ConnectionStringBuilder.createWithAadManagedIdentity(clusterUrl, clientId);
+ case AZ_CLI_DEV_ONLY ->
ConnectionStringBuilder.createWithAzureCli(clusterUrl);
};
builder.setConnectorDetails("Kusto.Nifi.Source",
StandardKustoQueryService.class.getPackage().getImplementationVersion(), null,
null, false, null, NIFI_SOURCE);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestServiceTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestServiceTest.java
new file mode 100644
index 0000000000..457612ce22
--- /dev/null
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoIngestServiceTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.data.explorer;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doNothing;
+
+import com.microsoft.azure.kusto.data.Client;
+import com.microsoft.azure.kusto.data.KustoOperationResult;
+import com.microsoft.azure.kusto.data.KustoResultColumn;
+import com.microsoft.azure.kusto.data.KustoResultSetTable;
+import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+
+@ExtendWith(MockitoExtension.class)
+public class StandardKustoIngestServiceTest {
+
+ private Client mockExecutionClient;
+ private KustoOperationResult mockKustoOperationResult;
+ private KustoResultSetTable mockKustoResultSetTable;
+ private ComponentLog mockLogger;
+ private StandardKustoIngestService service;
+
+ private static class TestableKustoIngestService extends
StandardKustoIngestService {
+ private final ComponentLog logger;
+ TestableKustoIngestService(ComponentLog logger, Client mockClient) {
+ this.logger = logger;
+ this.executionClient = mockClient;
+ }
+ @Override
+ protected ComponentLog getLogger() {
+ return logger;
+ }
+ }
+
+ @BeforeEach
+ public void setUp() {
+ mockLogger = mock(ComponentLog.class);
+ mockExecutionClient = mock(Client.class);
+ mockKustoOperationResult = mock(KustoOperationResult.class);
+ mockKustoResultSetTable = mock(KustoResultSetTable.class);
+ service = new TestableKustoIngestService(mockLogger,
mockExecutionClient);
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "true",
+ "false",
+ "null" //needs to be a constant
+ })
+ void testIsStreamingPolicyEnabled(String isStreamingEnabled) throws
Exception {
+ KustoResultColumn[] krcArray = Map.of(0, "PolicyName", 1,
"EntityName", 2, "Policy", 3, "ChildEntities", 4, "EntityType")
+ .entrySet().stream().map(entry -> new
KustoResultColumn(entry.getValue(), "string", entry.getKey()))
+ .toArray(KustoResultColumn[]::new);
+
+ // Arrange: mock the KustoOperationResult and KustoResultSetTable
+ // Simulate a table with a row indicating streaming policy is enabled
+
when(mockKustoResultSetTable.hasNext()).thenReturn(true).thenReturn(false);
+
when(mockKustoResultSetTable.next()).thenReturn(true).thenReturn(false);
+
when(mockKustoResultSetTable.getString(anyInt())).thenReturn("null".equals(isStreamingEnabled)
? null : isStreamingEnabled);
+ when(mockKustoResultSetTable.getColumns()).thenReturn(krcArray);
+
+
when(mockKustoOperationResult.getPrimaryResults()).thenReturn(mockKustoResultSetTable);
+
+ // Arrange: mock executeMgmt to return a Results with a
KustoResultSetTable
+ when(mockExecutionClient.executeMgmt(eq("db"), eq(".show database db
policy streamingingestion | project IsEnabled =
todynamic(Policy)['IsEnabled']")))
+ .thenReturn(mockKustoOperationResult);
+
+ // Act
+ boolean actual = service.isStreamingPolicyEnabled("db");
+
+ // Assert
+ boolean expected = "true".equals(isStreamingEnabled);
+ assertEquals(expected, actual, "Streaming policy enabled check failed
for value: " + isStreamingEnabled);
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "100",
+ "-1"
+ })
+ void testIsTableReadable(String rowCount) {
+ String databaseName = "db";
+ String tableName = "table";
+
+ KustoResultColumn[] krcArray = Map.of(0, "Count")
+ .entrySet().stream().map(entry -> new
KustoResultColumn(entry.getValue(), "int64", entry.getKey()))
+ .toArray(KustoResultColumn[]::new);
+
+ // Arrange: mock the KustoOperationResult and KustoResultSetTable
+ // Simulate a table with a row indicating streaming policy is enabled
+
when(mockKustoResultSetTable.hasNext()).thenReturn(true).thenReturn(false);
+
when(mockKustoResultSetTable.next()).thenReturn(true).thenReturn(false);
+
+ if ("-1".equals(rowCount)) {
+ // Simulate a table that is not readable, e.g., no rows or columns
+ when(mockKustoResultSetTable.getString(anyInt())).thenThrow(new
DataServiceException("Engine", "Table is not readable", true));
+ doNothing().when(mockLogger).error(anyString(), anyString(),
anyString(), Mockito.any(Throwable.class));
+ } else {
+
when(mockKustoResultSetTable.getString(anyInt())).thenReturn(rowCount);
+ }
+
+ when(mockKustoResultSetTable.getColumns()).thenReturn(krcArray);
+
+
when(mockKustoOperationResult.getPrimaryResults()).thenReturn(mockKustoResultSetTable);
+
+ // Arrange: mock executeMgmt to return a Results with a
KustoResultSetTable
+ when(mockExecutionClient.executeQuery(eq(databaseName),
eq(String.format("%s | count", tableName))))
+ .thenReturn(mockKustoOperationResult);
+
+ // Act
+ boolean actual = service.isTableReadable(databaseName, tableName);
+
+ // Assert
+ if ("-1".equals(rowCount)) {
+ // Simulate a table that is not readable, e.g., no rows or columns
+ assertFalse(actual, "Table should not be readable for row count: "
+ rowCount + " where an exception is thrown");
+ } else {
+ assertTrue(actual, "Table should be readable for row count: " +
rowCount);
+ }
+ }
+}
\ No newline at end of file