This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e10eceb6c5 NIFI-14614 Fixed QueryRecord column/value misalignment by
enforcing projection pushdown (#10269)
e10eceb6c5 is described below
commit e10eceb6c5f34f5decf03ac1421210842ab7b9fd
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Oct 17 22:39:01 2025 +0200
NIFI-14614 Fixed QueryRecord column/value misalignment by enforcing
projection pushdown (#10269)
Signed-off-by: David Handermann <[email protected]>
---
.../sql/internal/NiFiProjectTableScanRule.java | 16 +--
.../apache/nifi/sql/internal/NiFiTableScan.java | 7 ++
.../org/apache/nifi/sql/TestCalciteDatabase.java | 30 ++++++
.../serialization/record/ResultSetRecordSet.java | 19 ++--
.../datasources/ProcessGroupStatusDataSource.java | 2 +-
.../apache/nifi/queryrecord/RecordDataSource.java | 3 +-
.../nifi/processors/standard/TestQueryRecord.java | 112 ++++++++++++++++++++-
7 files changed, 173 insertions(+), 16 deletions(-)
diff --git
a/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiProjectTableScanRule.java
b/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiProjectTableScanRule.java
index 0bc0d74526..390c58ceaf 100644
---
a/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiProjectTableScanRule.java
+++
b/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiProjectTableScanRule.java
@@ -19,15 +19,15 @@ package org.apache.nifi.sql.internal;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilderFactory;
import java.util.List;
-class NiFiProjectTableScanRule extends
RelRule<NiFiProjectTableScanRule.Config> {
+public class NiFiProjectTableScanRule extends
RelRule<NiFiProjectTableScanRule.Config> {
NiFiProjectTableScanRule(final Config config) {
super(config);
@@ -35,8 +35,12 @@ class NiFiProjectTableScanRule extends
RelRule<NiFiProjectTableScanRule.Config>
@Override
public void onMatch(final RelOptRuleCall call) {
- final LogicalProject project = call.rel(0);
- final NiFiTableScan scan = call.rel(1);
+ final Project project = call.rel(0);
+
+ // Attempt to locate NiFiTableScan as immediate input
+ if (!(project.getInput() instanceof NiFiTableScan scan)) {
+ return;
+ }
final int[] fields = getProjectionFields(project.getProjects());
if (fields == null) {
@@ -66,10 +70,8 @@ class NiFiProjectTableScanRule extends
RelRule<NiFiProjectTableScanRule.Config>
}
public interface Config extends RelRule.Config {
- // This impl comes directly from the Calcite documentation.
Config DEFAULT = new StandardConfig()
- .withOperandSupplier(b0 ->
b0.operand(LogicalProject.class).oneInput(b1 ->
- b1.operand(NiFiTableScan.class).noInputs()));
+ .withOperandSupplier(b0 -> b0.operand(Project.class).anyInputs());
@Override
diff --git
a/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiTableScan.java
b/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiTableScan.java
index f68ba35bfa..cab8db7e6d 100644
---
a/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiTableScan.java
+++
b/nifi-commons/nifi-calcite-utils/src/main/java/org/apache/nifi/sql/internal/NiFiTableScan.java
@@ -51,6 +51,13 @@ class NiFiTableScan extends TableScan implements
EnumerableRel {
super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE),
Collections.emptyList(), table);
this.fields = fields;
fieldExpression = Expressions.constant(fields);
+ // Ensure projection pushdown rule is registered with the planner, as
some Calcite versions
+ // may not invoke the RelNode#register() method.
+ try {
+ cluster.getPlanner().addRule(new
NiFiProjectTableScanRule(NiFiProjectTableScanRule.Config.DEFAULT));
+ } catch (Exception ignored) {
+ // Rule may already be registered; intentionally ignored to avoid
duplicate registration errors.
+ }
}
@Override
diff --git
a/nifi-commons/nifi-calcite-utils/src/test/java/org/apache/nifi/sql/TestCalciteDatabase.java
b/nifi-commons/nifi-calcite-utils/src/test/java/org/apache/nifi/sql/TestCalciteDatabase.java
index b8604bc28a..2ddb4d573e 100644
---
a/nifi-commons/nifi-calcite-utils/src/test/java/org/apache/nifi/sql/TestCalciteDatabase.java
+++
b/nifi-commons/nifi-calcite-utils/src/test/java/org/apache/nifi/sql/TestCalciteDatabase.java
@@ -106,6 +106,36 @@ public class TestCalciteDatabase {
}
}
+ @Test
+ public void testSelectReorderedColumns() throws Exception {
+ final List<Object[]> rows = new ArrayList<>();
+ rows.add(new Object[] {"12345", "10101", "Credit Card", "Porduct
Credit", "RO"});
+ final NiFiTableSchema tableSchema = new NiFiTableSchema(List.of(
+ new ColumnSchema("ArticleCode", String.class, false),
+ new ColumnSchema("ProductCode", String.class, false),
+ new ColumnSchema("ArticleName", String.class, false),
+ new ColumnSchema("ProductName", String.class, false),
+ new ColumnSchema("Country", String.class, false)
+ ));
+
+ final CalciteDatabase database = new CalciteDatabase();
+ final ListDataSource dataSource = new ListDataSource(tableSchema,
rows);
+ final NiFiTable table = new NiFiTable("CANNED_DATA", dataSource,
mock(ComponentLog.class));
+ database.addTable(table);
+
+ final String query = "SELECT ArticleCode, ArticleName, ProductCode,
ProductName, Country FROM CANNED_DATA";
+ try (final PreparedStatement stmt =
database.getConnection().prepareStatement(query);
+ final ResultSet resultSet = stmt.executeQuery()) {
+ assertTrue(resultSet.next());
+ final List<String> actualRow = new ArrayList<>();
+ for (int i = 1; i <= 5; i++) {
+ actualRow.add(resultSet.getString(i));
+ }
+ final List<String> expectedRow = List.of("12345", "Credit Card",
"10101", "Porduct Credit", "RO");
+ assertEquals(expectedRow, actualRow);
+ }
+ }
+
public static class ToUpperCase {
public String invoke(final String value) {
return value.toUpperCase();
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 79dfe63d88..1ddf5700db 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -136,16 +136,23 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
protected Record createRecord(final ResultSet rs) throws SQLException {
final Map<String, Object> values = new
HashMap<>(schema.getFieldCount());
+ // Prefer label-based retrieval when possible to support tests/drivers
that stub by label,
+ // with index-based fallback (projection pushdown ensures index order
matches SELECT order).
+ int columnIndex = 1;
for (final RecordField field : schema.getFields()) {
final String fieldName = field.getFieldName();
- RecordFieldType fieldType = field.getDataType().getFieldType();
- final Object value;
+ final RecordFieldType fieldType =
field.getDataType().getFieldType();
- value = rsColumnNames.contains(fieldName)
- ? normalizeValue((fieldType == TIMESTAMP) ?
rs.getTimestamp(fieldName) : rs.getObject(fieldName))
- : null;
+ Object raw = null;
+ if (rsColumnNames.contains(fieldName)) {
+ raw = (fieldType == TIMESTAMP) ? rs.getTimestamp(fieldName) :
rs.getObject(fieldName);
+ }
+ if (raw == null) {
+ raw = (fieldType == TIMESTAMP) ? rs.getTimestamp(columnIndex)
: rs.getObject(columnIndex);
+ }
- values.put(fieldName, value);
+ values.put(fieldName, normalizeValue(raw));
+ columnIndex++;
}
return new MapRecord(schema, values);
diff --git
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
index 9db3963d99..1ff0e8b0dc 100644
---
a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
+++
b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProcessGroupStatusDataSource.java
@@ -123,10 +123,10 @@ public class ProcessGroupStatusDataSource implements
ResettableDataSource {
status.getInputCount(),
status.getOutputContentSize(),
status.getOutputCount(),
+ status.getQueuedCount(),
status.getQueuedContentSize(),
status.getActiveThreadCount(),
status.getTerminatedThreadCount(),
- status.getQueuedCount(),
status.getVersionedFlowState() == null ? null :
status.getVersionedFlowState().name(),
status.getProcessingNanos(),
status.getProcessingPerformanceStatus() == null ? -1 :
status.getProcessingPerformanceStatus().getCpuDuration(),
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/RecordDataSource.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/RecordDataSource.java
index f82ce414ef..b899c6044c 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/RecordDataSource.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/RecordDataSource.java
@@ -119,7 +119,8 @@ public class RecordDataSource implements
ResettableDataSource {
case TIME -> ScalarType.TIME;
case TIMESTAMP -> ScalarType.TIMESTAMP;
case LONG -> ScalarType.LONG;
- case STRING, ENUM -> ScalarType.STRING;
+ case STRING -> ScalarType.STRING;
+ case ENUM -> ScalarType.OBJECT;
case ARRAY -> new ArrayType(getColumnType(((ArrayDataType)
fieldType).getElementType()));
case RECORD -> new ScalarType(Record.class);
case MAP -> {
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index f63d802a4b..7bad79e5f3 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -83,7 +83,6 @@ public class TestQueryRecord {
return runner;
}
-
@Test
public void testRecordPathFunctions() throws InitializationException {
final Record record = createHierarchicalRecord();
@@ -547,6 +546,117 @@ public class TestQueryRecord {
assertEquals("Software Engineer", output.getValue("title"));
}
+ @Test
+ public void testSelectOrderRespected() throws InitializationException {
+ final MockRecordParser recordReader = new MockRecordParser();
+ recordReader.addSchemaField("ArticleCode", RecordFieldType.STRING);
+ recordReader.addSchemaField("ProductCode", RecordFieldType.STRING);
+ recordReader.addSchemaField("ArticleName", RecordFieldType.STRING);
+ recordReader.addSchemaField("ProductName", RecordFieldType.STRING);
+ recordReader.addSchemaField("Country", RecordFieldType.STRING);
+ recordReader.addRecord("12345", "10101", "Credit Card", "Porduct
Credit", "RO");
+
+ final ArrayListRecordWriter writer = new ArrayListRecordWriter(null);
+
+ final TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT ArticleCode, ArticleName, ProductCode, ProductName,
Country FROM FLOWFILE");
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Record out = written.get(0);
+ assertEquals("12345", out.getAsString("ArticleCode"));
+ assertEquals("Credit Card", out.getAsString("ArticleName"));
+ assertEquals("10101", out.getAsString("ProductCode"));
+ assertEquals("Porduct Credit", out.getAsString("ProductName"));
+ assertEquals("RO", out.getAsString("Country"));
+ }
+
+ @Test
+ public void testUnionAllWithProjection() throws InitializationException {
+ final MockRecordParser recordReader = new MockRecordParser();
+ recordReader.addSchemaField("employeeId", RecordFieldType.STRING);
+ recordReader.addSchemaField("email", RecordFieldType.STRING);
+ recordReader.addSchemaField("englishTrainingTime",
RecordFieldType.INT);
+ recordReader.addSchemaField("englishLastOverallProficiencyLevel",
RecordFieldType.STRING);
+ recordReader.addSchemaField("frenchTrainingTime", RecordFieldType.INT);
+ recordReader.addSchemaField("frenchLastOverallProficiencyLevel",
RecordFieldType.STRING);
+
+ recordReader.addRecord("1234", "[email protected]", 114828, "B2.1",
406, null);
+
+ final ArrayListRecordWriter writer = new ArrayListRecordWriter(null);
+
+ final TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+ final String sql = """
+ (
+ SELECT employeeId,
+ email,
+ 'en' AS lovCode,
+ CAST(englishTrainingTime AS INTEGER) AS timeSpent,
+ englishLastOverallProficiencyLevel AS proficiencyLevel
+ FROM FLOWFILE
+ WHERE CAST(englishTrainingTime AS INTEGER) <> 0
+ ) UNION ALL (
+ SELECT employeeId,
+ email,
+ 'fr' AS lovCode,
+ CAST(frenchTrainingTime AS INTEGER) AS timeSpent,
+ frenchLastOverallProficiencyLevel AS proficiencyLevel
+ FROM FLOWFILE
+ WHERE CAST(frenchTrainingTime AS INTEGER) <> 0
+ )
+ """;
+
+ runner.setProperty(REL_NAME, sql);
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(2, written.size());
+
+ final Record first = written.get(0);
+ final Record second = written.get(1);
+
+ // Order of UNION ALL output rows may depend on planner, so verify
both combinations
+ final boolean enFirst = "en".equals(first.getAsString("lovCode"));
+ final Record en = enFirst ? first : second;
+ final Record fr = enFirst ? second : first;
+
+ assertEquals("en", en.getAsString("lovCode"));
+ assertEquals("1234", en.getAsString("employeeId"));
+ assertEquals("[email protected]", en.getAsString("email"));
+ assertEquals(114828, en.getAsInt("timeSpent"));
+ assertEquals("B2.1", en.getAsString("proficiencyLevel"));
+
+ assertEquals("fr", fr.getAsString("lovCode"));
+ assertEquals("1234", fr.getAsString("employeeId"));
+ assertEquals("[email protected]", fr.getAsString("email"));
+ assertEquals(406, fr.getAsInt("timeSpent"));
+ assertEquals(null, fr.getValue("proficiencyLevel"));
+ }
+
@Test
public void testRecordPathWithArrayAndOnlyOneElementMatchingRPath() throws
InitializationException {