This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 59f776c853 fix(workflow-operator): auto-rename empty CSV column
headers (#5282)
59f776c853 is described below
commit 59f776c853d3b18a6e385ed9b27b7d4aafc835de
Author: Meng Wang <[email protected]>
AuthorDate: Thu May 28 17:10:36 2026 -0700
fix(workflow-operator): auto-rename empty CSV column headers (#5282)
### What changes were proposed in this PR?
A CSV with an empty column header (e.g. a trailing comma `id,name,,age`)
produced an `Attribute` with an empty name. After #3295 every output
port writes via `IcebergTableWriter → Parquet`, where Avro rejects empty
names with `IllegalArgumentException: Empty name` at flush time — losing
the operator port's entire result.
Rename blank header positions to `column-<index>` (the convention used
by pandas, Spark, R, and DuckDB) in all three CSV scan operators:
`CSVScanSourceOpDesc`, `ParallelCSVScanSourceOpDesc`, and
`CSVOldScanSourceOpDesc`. The issue named the first two; the legacy
`csvOld` variant is still registered in `LogicalOp` and had the same
latent bug.
Note: `ParallelCSVScanSourceOpDesc` is currently commented out of
`LogicalOp`'s operator registry (so it is not reachable from the UI),
but it is fixed here for consistency and so the bug does not resurface
if it is re-enabled for experiments.
### Any related issues, documentation, discussions?
Closes #5279.
### How was this PR tested?
Added three cases to `CSVScanSourceOpDescSpec` — one per operator —
feeding a CSV with an empty third header (`id,name,,age`) and asserting
the inferred schema is `["id", "name", "column-3", "age"]`.
`WorkflowOperator` suite is green (13 passed); scalafmt clean.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---
.../source/scan/csv/CSVScanSourceOpDesc.scala | 6 ++-
.../scan/csv/ParallelCSVScanSourceOpDesc.scala | 7 ++-
.../scan/csvOld/CSVOldScanSourceOpDesc.scala | 7 ++-
.../source/scan/csv/CSVScanSourceOpDescSpec.scala | 50 ++++++++++++++++++++++
4 files changed, 65 insertions(+), 5 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
index 57b173583e..d32bbb31c6 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDesc.scala
@@ -120,7 +120,11 @@ class CSVScanSourceOpDesc extends ScanSourceOpDesc {
else (1 to attributeTypeList.length).map(i => "column-" + i).toArray
header.indices.foldLeft(Schema()) { (schema, i) =>
- schema.add(header(i), attributeTypeList(i))
+ // Auto-rename blank header positions to `column-N` so empty CSV headers
+ // (e.g. a trailing comma) do not propagate empty attribute names to
+ // downstream Iceberg/Parquet writers, which reject them.
+ val name = Option(header(i)).filter(_.nonEmpty).getOrElse(s"column-${i +
1}")
+ schema.add(name, attributeTypeList(i))
}
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
index 6ac2d20af4..9609b4e239 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/ParallelCSVScanSourceOpDesc.scala
@@ -104,10 +104,13 @@ class ParallelCSVScanSourceOpDesc extends
ScanSourceOpDesc {
reader.close()
- // build schema based on inferred AttributeTypes
+ // build schema based on inferred AttributeTypes.
+ // Auto-rename blank header positions to `column-N` so empty CSV headers
+ // (e.g. a trailing comma) do not propagate empty attribute names to
+ // downstream Iceberg/Parquet writers, which reject them.
Schema().add(firstRow.indices.map { i =>
new Attribute(
- if (hasHeader) firstRow(i) else s"column-${i + 1}",
+ if (hasHeader && firstRow(i).nonEmpty) firstRow(i) else s"column-${i +
1}",
attributeTypeList(i)
)
})
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala
index 08b5ea1ce1..cdf9e655cc 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csvOld/CSVOldScanSourceOpDesc.scala
@@ -102,10 +102,13 @@ class CSVOldScanSourceOpDesc extends ScanSourceOpDesc {
reader.close()
- // build schema based on inferred AttributeTypes
+ // build schema based on inferred AttributeTypes.
+ // Auto-rename blank header positions to `column-N` so empty CSV headers
+ // (e.g. a trailing comma) do not propagate empty attribute names to
+ // downstream Iceberg/Parquet writers, which reject them.
Schema().add(firstRow.indices.map { i =>
new Attribute(
- if (hasHeader) firstRow(i) else s"column-${i + 1}",
+ if (hasHeader && firstRow(i).nonEmpty) firstRow(i) else s"column-${i +
1}",
attributeTypeList(i)
)
})
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
index eb174e0691..87b86fea6d 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/csv/CSVScanSourceOpDescSpec.scala
@@ -26,9 +26,13 @@ import
org.apache.texera.amber.core.workflow.WorkflowContext.{
DEFAULT_WORKFLOW_ID
}
import org.apache.texera.amber.operator.TestOperators
+import
org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
class CSVScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter {
var csvScanSourceOpDesc: CSVScanSourceOpDesc = _
@@ -38,6 +42,18 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with
BeforeAndAfter {
parallelCsvScanSourceOpDesc = new ParallelCSVScanSourceOpDesc()
}
+ // Writes a CSV whose header row has an empty column (the third position),
+ // e.g. `id,name,,age`, and returns the absolute path.
+ private def writeCsvWithEmptyHeader(): String = {
+ val tmpFile = Files.createTempFile("empty-header-", ".csv")
+ tmpFile.toFile.deleteOnExit()
+ Files.write(
+ tmpFile,
+
"id,name,,age\n1,Alice,x,30\n2,Bob,y,25\n".getBytes(StandardCharsets.UTF_8)
+ )
+ tmpFile.toString
+ }
+
it should "infer schema from single-line-data csv" in {
parallelCsvScanSourceOpDesc.fileName =
Some(TestOperators.CountrySalesSmallCsvPath)
@@ -160,4 +176,38 @@ class CSVScanSourceOpDescSpec extends AnyFlatSpec with
BeforeAndAfter {
assert(csvScanSourceOpDesc.customDelimiter.contains(","))
}
+ it should "auto-rename empty CSV column headers to column-N" in {
+ val path = writeCsvWithEmptyHeader()
+ csvScanSourceOpDesc.fileName = Some(path)
+ csvScanSourceOpDesc.customDelimiter = Some(",")
+ csvScanSourceOpDesc.hasHeader = true
+ csvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path))
+
+ val names =
csvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList
+ assert(names == List("id", "name", "column-3", "age"))
+ }
+
+ it should "auto-rename empty CSV column headers to column-N for parallel
CSV" in {
+ val path = writeCsvWithEmptyHeader()
+ parallelCsvScanSourceOpDesc.fileName = Some(path)
+ parallelCsvScanSourceOpDesc.customDelimiter = Some(",")
+ parallelCsvScanSourceOpDesc.hasHeader = true
+ parallelCsvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path))
+
+ val names =
parallelCsvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList
+ assert(names == List("id", "name", "column-3", "age"))
+ }
+
+ it should "auto-rename empty CSV column headers to column-N for old CSV" in {
+ val path = writeCsvWithEmptyHeader()
+ val oldCsvScanSourceOpDesc = new CSVOldScanSourceOpDesc()
+ oldCsvScanSourceOpDesc.fileName = Some(path)
+ oldCsvScanSourceOpDesc.customDelimiter = Some(",")
+ oldCsvScanSourceOpDesc.hasHeader = true
+ oldCsvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(path))
+
+ val names =
oldCsvScanSourceOpDesc.sourceSchema().getAttributes.map(_.getName).toList
+ assert(names == List("id", "name", "column-3", "age"))
+ }
+
}