This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e10eceb6c5 NIFI-14614 Fixed QueryRecord column/value misalignment by 
enforcing projection pushdown (#10269)
e10eceb6c5 is described below

commit e10eceb6c5f34f5decf03ac1421210842ab7b9fd
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Oct 17 22:39:01 2025 +0200

    NIFI-14614 Fixed QueryRecord column/value misalignment by enforcing 
projection pushdown (#10269)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../sql/internal/NiFiProjectTableScanRule.java     |  16 +--
 .../apache/nifi/sql/internal/NiFiTableScan.java    |   7 ++
 .../org/apache/nifi/sql/TestCalciteDatabase.java   |  30 ++++++
 .../serialization/record/ResultSetRecordSet.java   |  19 ++--
 .../datasources/ProcessGroupStatusDataSource.java  |   2 +-
 .../apache/nifi/queryrecord/RecordDataSource.java  |   3 +-
 .../nifi/processors/standard/TestQueryRecord.java  | 112 ++++++++++++++++++++-
 7 files changed, 173 insertions(+), 16 deletions(-)

diff --git 
a/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiProjectTableScanRule.java
 
b/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiProjectTableScanRule.java
index 0bc0d74526..390c58ceaf 100644
--- 
a/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiProjectTableScanRule.java
+++ 
b/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiProjectTableScanRule.java
@@ -19,15 +19,15 @@ package org.apache.nifi.sql.internal;
 
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.RelBuilderFactory;
 
 import java.util.List;
 
-class NiFiProjectTableScanRule extends 
RelRule<NiFiProjectTableScanRule.Config> {
+public class NiFiProjectTableScanRule extends 
RelRule<NiFiProjectTableScanRule.Config> {
 
     NiFiProjectTableScanRule(final Config config) {
         super(config);
@@ -35,8 +35,12 @@ class NiFiProjectTableScanRule extends 
RelRule<NiFiProjectTableScanRule.Config>
 
     @Override
     public void onMatch(final RelOptRuleCall call) {
-        final LogicalProject project = call.rel(0);
-        final NiFiTableScan scan = call.rel(1);
+        final Project project = call.rel(0);
+
+        // Attempt to locate NiFiTableScan as immediate input
+        if (!(project.getInput() instanceof NiFiTableScan scan)) {
+            return;
+        }
 
         final int[] fields = getProjectionFields(project.getProjects());
         if (fields == null) {
@@ -66,10 +70,8 @@ class NiFiProjectTableScanRule extends 
RelRule<NiFiProjectTableScanRule.Config>
     }
 
     public interface Config extends RelRule.Config {
-        // This impl comes directly from the Calcite documentation.
         Config DEFAULT = new StandardConfig()
-            .withOperandSupplier(b0 -> 
b0.operand(LogicalProject.class).oneInput(b1 ->
-                b1.operand(NiFiTableScan.class).noInputs()));
+            .withOperandSupplier(b0 -> b0.operand(Project.class).anyInputs());
 
 
         @Override
diff --git 
a/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiTableScan.java
 
b/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiTableScan.java
index f68ba35bfa..cab8db7e6d 100644
--- 
a/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiTableScan.java
+++ 
b/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiTableScan.java
@@ -51,6 +51,13 @@ class NiFiTableScan extends TableScan implements 
EnumerableRel {
         super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), 
Collections.emptyList(), table);
         this.fields = fields;
         fieldExpression = Expressions.constant(fields);
+        // Ensure projection pushdown rule is registered with the planner, as 
some Calcite versions
+        // may not invoke the RelNode#register() method.
+        try {
+            cluster.getPlanner().addRule(new 
NiFiProjectTableScanRule(NiFiProjectTableScanRule.Config.DEFAULT));
+        } catch (Exception ignored) {
+            // Rule may already be registered; intentionally ignored to avoid 
duplicate registration errors.
+        }
     }
 
     @Override
diff --git 
a/nifi-commons/nifi-calcite-utils/src/test/java/org/apache/nifi/sql/TestCalciteDatabase.java
 
b/nifi-commons/nifi-calcite-utils/src/test/java/org/apache/nifi/sql/TestCalciteDatabase.java
index b8604bc28a..2ddb4d573e 100644
--- 
a/nifi-commons/nifi-calcite-utils/src/test/java/org/apache/nifi/sql/TestCalciteDatabase.java
+++ 
b/nifi-commons/nifi-calcite-utils/src/test/java/org/apache/nifi/sql/TestCalciteDatabase.java
@@ -106,6 +106,36 @@ public class TestCalciteDatabase {
         }
     }
 
+    @Test
+    public void testSelectReorderedColumns() throws Exception {
+        final List<Object[]> rows = new ArrayList<>();
+        rows.add(new Object[] {"12345", "10101", "Credit Card", "Porduct 
Credit", "RO"});
+        final NiFiTableSchema tableSchema = new NiFiTableSchema(List.of(
+                new ColumnSchema("ArticleCode", String.class, false),
+                new ColumnSchema("ProductCode", String.class, false),
+                new ColumnSchema("ArticleName", String.class, false),
+                new ColumnSchema("ProductName", String.class, false),
+                new ColumnSchema("Country", String.class, false)
+        ));
+
+        final CalciteDatabase database = new CalciteDatabase();
+        final ListDataSource dataSource = new ListDataSource(tableSchema, 
rows);
+        final NiFiTable table = new NiFiTable("CANNED_DATA", dataSource, 
mock(ComponentLog.class));
+        database.addTable(table);
+
+        final String query = "SELECT ArticleCode, ArticleName, ProductCode, 
ProductName, Country FROM CANNED_DATA";
+        try (final PreparedStatement stmt = 
database.getConnection().prepareStatement(query);
+             final ResultSet resultSet = stmt.executeQuery()) {
+            assertTrue(resultSet.next());
+            final List<String> actualRow = new ArrayList<>();
+            for (int i = 1; i <= 5; i++) {
+                actualRow.add(resultSet.getString(i));
+            }
+            final List<String> expectedRow = List.of("12345", "Credit Card", 
"10101", "Porduct Credit", "RO");
+            assertEquals(expectedRow, actualRow);
+        }
+    }
+
     public static class ToUpperCase {
         public String invoke(final String value) {
             return value.toUpperCase();
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 79dfe63d88..1ddf5700db 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -136,16 +136,23 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
     protected Record createRecord(final ResultSet rs) throws SQLException {
         final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount());
 
+        // Prefer label-based retrieval when possible to support tests/drivers 
that stub by label,
+        // with index-based fallback (projection pushdown ensures index order 
matches SELECT order).
+        int columnIndex = 1;
         for (final RecordField field : schema.getFields()) {
             final String fieldName = field.getFieldName();
-            RecordFieldType fieldType = field.getDataType().getFieldType();
-            final Object value;
+            final RecordFieldType fieldType = 
field.getDataType().getFieldType();
 
-            value = rsColumnNames.contains(fieldName)
-                    ? normalizeValue((fieldType == TIMESTAMP) ? 
rs.getTimestamp(fieldName) : rs.getObject(fieldName))
-                    : null;
+            Object raw = null;
+            if (rsColumnNames.contains(fieldName)) {
+                raw = (fieldType == TIMESTAMP) ? rs.getTimestamp(fieldName) : 
rs.getObject(fieldName);
+            }
+            if (raw == null) {
+                raw = (fieldType == TIMESTAMP) ? rs.getTimestamp(columnIndex) 
: rs.getObject(columnIndex);
+            }
 
-            values.put(fieldName, value);
+            values.put(fieldName, normalizeValue(raw));
+            columnIndex++;
         }
 
         return new MapRecord(schema, values);
diff --git 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
index 9db3963d99..1ff0e8b0dc 100644
--- 
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
+++ 
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
@@ -123,10 +123,10 @@ public class ProcessGroupStatusDataSource implements 
ResettableDataSource {
             status.getInputCount(),
             status.getOutputContentSize(),
             status.getOutputCount(),
+            status.getQueuedCount(),
             status.getQueuedContentSize(),
             status.getActiveThreadCount(),
             status.getTerminatedThreadCount(),
-            status.getQueuedCount(),
             status.getVersionedFlowState() == null ? null : 
status.getVersionedFlowState().name(),
             status.getProcessingNanos(),
             status.getProcessingPerformanceStatus() == null ? -1 : 
status.getProcessingPerformanceStatus().getCpuDuration(),
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/RecordDataSource.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/RecordDataSource.java
index f82ce414ef..b899c6044c 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/RecordDataSource.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/RecordDataSource.java
@@ -119,7 +119,8 @@ public class RecordDataSource implements 
ResettableDataSource {
             case TIME -> ScalarType.TIME;
             case TIMESTAMP -> ScalarType.TIMESTAMP;
             case LONG -> ScalarType.LONG;
-            case STRING, ENUM -> ScalarType.STRING;
+            case STRING -> ScalarType.STRING;
+            case ENUM -> ScalarType.OBJECT;
             case ARRAY -> new ArrayType(getColumnType(((ArrayDataType) 
fieldType).getElementType()));
             case RECORD -> new ScalarType(Record.class);
             case MAP -> {
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index f63d802a4b..7bad79e5f3 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -83,7 +83,6 @@ public class TestQueryRecord {
         return runner;
     }
 
-
     @Test
     public void testRecordPathFunctions() throws InitializationException {
         final Record record = createHierarchicalRecord();
@@ -547,6 +546,117 @@ public class TestQueryRecord {
         assertEquals("Software Engineer", output.getValue("title"));
     }
 
+    @Test
+    public void testSelectOrderRespected() throws InitializationException {
+        final MockRecordParser recordReader = new MockRecordParser();
+        recordReader.addSchemaField("ArticleCode", RecordFieldType.STRING);
+        recordReader.addSchemaField("ProductCode", RecordFieldType.STRING);
+        recordReader.addSchemaField("ArticleName", RecordFieldType.STRING);
+        recordReader.addSchemaField("ProductName", RecordFieldType.STRING);
+        recordReader.addSchemaField("Country", RecordFieldType.STRING);
+        recordReader.addRecord("12345", "10101", "Credit Card", "Porduct 
Credit", "RO");
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(null);
+
+        final TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT ArticleCode, ArticleName, ProductCode, ProductName, 
Country FROM FLOWFILE");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record out = written.get(0);
+        assertEquals("12345", out.getAsString("ArticleCode"));
+        assertEquals("Credit Card", out.getAsString("ArticleName"));
+        assertEquals("10101", out.getAsString("ProductCode"));
+        assertEquals("Porduct Credit", out.getAsString("ProductName"));
+        assertEquals("RO", out.getAsString("Country"));
+    }
+
+    @Test
+    public void testUnionAllWithProjection() throws InitializationException {
+        final MockRecordParser recordReader = new MockRecordParser();
+        recordReader.addSchemaField("employeeId", RecordFieldType.STRING);
+        recordReader.addSchemaField("email", RecordFieldType.STRING);
+        recordReader.addSchemaField("englishTrainingTime", 
RecordFieldType.INT);
+        recordReader.addSchemaField("englishLastOverallProficiencyLevel", 
RecordFieldType.STRING);
+        recordReader.addSchemaField("frenchTrainingTime", RecordFieldType.INT);
+        recordReader.addSchemaField("frenchLastOverallProficiencyLevel", 
RecordFieldType.STRING);
+
+        recordReader.addRecord("1234", "[email protected]", 114828, "B2.1", 
406, null);
+
+        final ArrayListRecordWriter writer = new ArrayListRecordWriter(null);
+
+        final TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        final String sql = """
+                (
+                  SELECT employeeId,
+                         email,
+                         'en' AS lovCode,
+                         CAST(englishTrainingTime AS INTEGER) AS timeSpent,
+                         englishLastOverallProficiencyLevel AS proficiencyLevel
+                  FROM FLOWFILE
+                  WHERE CAST(englishTrainingTime AS INTEGER) <> 0
+                ) UNION ALL (
+                  SELECT employeeId,
+                         email,
+                         'fr' AS lovCode,
+                         CAST(frenchTrainingTime AS INTEGER) AS timeSpent,
+                         frenchLastOverallProficiencyLevel AS proficiencyLevel
+                  FROM FLOWFILE
+                  WHERE CAST(frenchTrainingTime AS INTEGER) <> 0
+                )
+                """;
+
+        runner.setProperty(REL_NAME, sql);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(2, written.size());
+
+        final Record first = written.get(0);
+        final Record second = written.get(1);
+
+        // Order of UNION ALL output rows may depend on planner, so verify 
both combinations
+        final boolean enFirst = "en".equals(first.getAsString("lovCode"));
+        final Record en = enFirst ? first : second;
+        final Record fr = enFirst ? second : first;
+
+        assertEquals("en", en.getAsString("lovCode"));
+        assertEquals("1234", en.getAsString("employeeId"));
+        assertEquals("[email protected]", en.getAsString("email"));
+        assertEquals(114828, en.getAsInt("timeSpent"));
+        assertEquals("B2.1", en.getAsString("proficiencyLevel"));
+
+        assertEquals("fr", fr.getAsString("lovCode"));
+        assertEquals("1234", fr.getAsString("employeeId"));
+        assertEquals("[email protected]", fr.getAsString("email"));
+        assertEquals(406, fr.getAsInt("timeSpent"));
+        assertEquals(null, fr.getValue("proficiencyLevel"));
+    }
+
 
     @Test
     public void testRecordPathWithArrayAndOnlyOneElementMatchingRPath() throws 
InitializationException {

Reply via email to