This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 59f776c853 fix(workflow-operator): auto-rename empty CSV column 
headers (#5282)
59f776c853 is described below

commit 59f776c853d3b18a6e385ed9b27b7d4aafc835de
Author: Meng Wang <[email protected]>
AuthorDate: Thu May 28 17:10:36 2026 -0700

    fix(workflow-operator): auto-rename empty CSV column headers (#5282)
    
    ### What changes were proposed in this PR?
    
    A CSV with an empty column header (e.g. a trailing comma `id,name,,age`)
    produced an `Attribute` with an empty name. After #3295 every output
    port writes via `IcebergTableWriter → Parquet`, where Avro rejects empty
    names with `IllegalArgumentException: Empty name` at flush time — losing
    the operator port's entire result.
    
    Rename blank header positions to `column-<index>` (the convention used
    by pandas, Spark, R, and DuckDB) in all three CSV scan operators:
    `CSVScanSourceOpDesc`, `ParallelCSVScanSourceOpDesc`, and
    `CSVOldScanSourceOpDesc`. The issue named the first two; the legacy
    `csvOld` variant is still registered in `LogicalOp` and had the same
    latent bug.
    
    Note: `ParallelCSVScanSourceOpDesc` is currently commented out of
    `LogicalOp`'s operator registry (so it is not reachable from the UI),
    but it is fixed here for consistency and so the bug does not resurface
    if it is re-enabled for experiments.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5279.
    
    ### How was this PR tested?
    
    Added three cases to `CSVScanSourceOpDescSpec` — one per operator —
    feeding a CSV with an empty third header (`id,name,,age`) and asserting
    the inferred schema is `["id", "name", "column-3", "age"]`.
    `WorkflowOperator` suite is green (13 passed); scalafmt clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
---
 .../source/scan/csv/CSVScanSourceOpDesc.scala      |  6 ++-
 .../scan/csv/ParallelCSVScanSourceOpDesc.scala     |  7 ++-
 .../scan/csvOld/CSVOldScanSourceOpDesc.scala       |  7 ++-
 .../source/scan/csv/CSVScanSourceOpDescSpec.scala  | 50 ++++++++++++++++++++++
 4 files changed, 65 insertions(+), 5 deletions(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
index 57b173583e..d32bbb31c6 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
@@ -120,7 +120,11 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc {
       else (1 to attributeTypeList.length).map(i => "column-" + i).toArray
 
     header.indices.foldLeft(Schema()) { (schema, i) =>
-      schema.add(header(i), attributeTypeList(i))
+      // Auto-rename blank header positions to `column-N` so empty CSV headers
+      // (e.g. a trailing comma) do not propagate empty attribute names to
+      // downstream Iceberg/Parquet writers, which reject them.
+      val name = Option(header(i)).filter(_.nonEmpty).getOrElse(s"column-${i + 
1}")
+      schema.add(name, attributeTypeList(i))
     }
 
   }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
index 6ac2d20af4..9609b4e239 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
@@ -104,10 +104,13 @@ class ParallelCSVScanSourceOpDesc extends 
ScanSourceOpDesc {
 
     reader.close()
 
-    // build schema based on inferred AttributeTypes
+    // build schema based on inferred AttributeTypes.
+    // Auto-rename blank header positions to `column-N` so empty CSV headers
+    // (e.g. a trailing comma) do not propagate empty attribute names to
+    // downstream Iceberg/Parquet writers, which reject them.
     Schema().add(firstRow.indices.map { i =>
       new Attribute(
-        if (hasHeader) firstRow(i) else s"column-${i + 1}",
+        if (hasHeader && firstRow(i).nonEmpty) firstRow(i) else s"column-${i + 
1}",
         attributeTypeList(i)
       )
     })
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala
index 08b5ea1ce1..cdf9e655cc 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala
@@ -102,10 +102,13 @@ class CSVOldScanSourceOpDesc extends ScanSourceOpDesc {
 
     reader.close()
 
-    // build schema based on inferred AttributeTypes
+    // build schema based on inferred AttributeTypes.
+    // Auto-rename blank header positions to `column-N` so empty CSV headers
+    // (e.g. a trailing comma) do not propagate empty attribute names to
+    // downstream Iceberg/Parquet writers, which reject them.
     Schema().add(firstRow.indices.map { i =>
       new Attribute(
-        if (hasHeader) firstRow(i) else s"column-${i + 1}",
+        if (hasHeader && firstRow(i).nonEmpty) firstRow(i) else s"column-${i + 
1}",
         attributeTypeList(i)
       )
     })
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
index eb174e0691..87b86fea6d 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
@@ -26,9 +26,13 @@ import 
org.apache.texera.amber.core.workflow.WorkflowContext.{
   DEFAULT_WORKFLOW_ID
 }
 import org.apache.texera.amber.operator.TestOperators
+import 
org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
 import org.scalatest.BeforeAndAfter
 import org.scalatest.flatspec.AnyFlatSpec
 
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
 class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter {
 
   var csvScanSourceOpDesc: CSVScanSourceOpDesc = _
@@ -38,6 +42,18 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with 
BeforeAndAfter {
     parallelCsvScanSourceOpDesc = new ParallelCSVScanSourceOpDesc()
   }
 
+  // Writes a CSV whose header row has an empty column (the third position),
+  // e.g. `id,name,,age`, and returns the absolute path.
+  private def writeCsvWithEmptyHeader(): String = {
+    val tmpFile = Files.createTempFile("empty-header-", ".csv")
+    tmpFile.toFile.deleteOnExit()
+    Files.write(
+      tmpFile,
+      
"id,name,,age\n1,Alice,x,30\n2,Bob,y,25\n".getBytes(StandardCharsets.UTF_8)
+    )
+    tmpFile.toString
+  }
+
   it should "infer schema from single-line-data csv" in {
 
     parallelCsvScanSourceOpDesc.fileName = 
Some(TestOperators.CountrySalesSmallCsvPath)
@@ -160,4 +176,38 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with 
BeforeAndAfter {
     assert(csvScanSourceOpDesc.customDelimiter.contains(","))
   }
 
+  it should "auto-rename empty CSV column headers to column-N" in {
+    val path = writeCsvWithEmptyHeader()
+    csvScanSourceOpDesc.fileName = Some(path)
+    csvScanSourceOpDesc.customDelimiter = Some(",")
+    csvScanSourceOpDesc.hasHeader = true
+    csvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path))
+
+    val names = 
csvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList
+    assert(names == List("id", "name", "column-3", "age"))
+  }
+
+  it should "auto-rename empty CSV column headers to column-N for parallel 
CSV" in {
+    val path = writeCsvWithEmptyHeader()
+    parallelCsvScanSourceOpDesc.fileName = Some(path)
+    parallelCsvScanSourceOpDesc.customDelimiter = Some(",")
+    parallelCsvScanSourceOpDesc.hasHeader = true
+    parallelCsvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path))
+
+    val names = 
parallelCsvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList
+    assert(names == List("id", "name", "column-3", "age"))
+  }
+
+  it should "auto-rename empty CSV column headers to column-N for old CSV" in {
+    val path = writeCsvWithEmptyHeader()
+    val oldCsvScanSourceOpDesc = new CSVOldScanSourceOpDesc()
+    oldCsvScanSourceOpDesc.fileName = Some(path)
+    oldCsvScanSourceOpDesc.customDelimiter = Some(",")
+    oldCsvScanSourceOpDesc.hasHeader = true
+    oldCsvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path))
+
+    val names = 
oldCsvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList
+    assert(names == List("id", "name", "column-3", "age"))
+  }
+
 }

Reply via email to