[
https://issues.apache.org/jira/browse/SPARK-57135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Akshat Shenoi updated SPARK-57135:
----------------------------------
Description:
Spark cannot currently read CSV files packaged inside tar archives (.tar,
.tar.gz, .tgz); users must unpack them externally first.
This adds opt-in support (spark.sql.files.archive.enabled, default false) for
reading such archives through the CSV data source by streaming each entry
through the CSV parser, without materializing entries to local disk:
* A streaming ArchiveReader opens the tar once and yields one bounded
InputStream per entry, advancing lazily so memory stays bounded regardless of
archive size. Directories and dot-prefixed entries are skipped. .tar.gz is
decompressed via Hadoop's codec factory; .tgz is gunzipped explicitly.
* CSVFileFormat treats archives as non-splittable (one split per archive) and
streams each entry through UnivocityParser, handling each entry as a standalone
CSV file (headers, multiLine, delimiters, column pruning).
* Schema inference streams entries through the same CSVInferSchema path as a
multi-file CSV read.
Scope: CSV only. Streaming supports formats parseable sequentially; formats
needing random access (Parquet/ORC footers) cannot stream from a tar and are
out of scope.
(Supersedes the original proposal, which materialized entries to temp files via
a format-agnostic ArchiveFormat.)
was:
h2. Problem
V1 {{FileFormat}} implementations (CSV, JSON, Parquet, ORC, etc.) are not
archive-aware: if a user points a datasource reader at a {{{}.tar{}}},
{{{}.tar.gz{}}}, or {{.tgz}} file, Spark treats it as a single opaque file and
either fails or returns garbage instead of reading the entries inside.
A common ingestion pattern stores many small files inside tar archives to
reduce namespace pressure. Today there is no way to read these without first
unpacking them externally.
h2. Proposed Solution
Add an {{ArchiveFormat}} utility object in
{{org.apache.spark.sql.execution.datasources}} and hook it into the V1 scan
pipeline:
* {*}{{ArchiveFormat.readArchive}}{*}: at scan time, materializes one tar
entry at a time to a local temp file and invokes the caller-supplied {{readFn}}
against a synthetic {{PartitionedFile}} pointing at that temp file. Only one
entry's bytes live on disk per task; the temp dir is cleaned up on iterator
close and on task completion.
* {*}{{ArchiveFormat.expandArchives}}{*}: at schema-inference time
(driver-side), does the same materialization and substitutes the resulting
{{{}FileStatuses into inferSchema{}}}.
* {*}{{ArchiveFormat.isArchivePath}}{*}: detects {{{}.tar{}}},
{{{}.tar.gz{}}}, and {{.tgz}} extensions.
* Entries whose basename starts with {{.}} are skipped (covers macOS
AppleDouble sidecars, {{{}.DS_Store{}}}, etc.).
* Gzip handling: Hadoop's {{CompressionCodecFactory}} auto-decompresses
{{.tar.gz}} via {{{}CodecStreams{}}}; {{.tgz}} is not a registered Hadoop codec
extension so the gzip layer is unwrapped explicitly with
{{{}GZIPInputStream{}}}.
Materializing to disk (rather than streaming) means formats that need random
access (Parquet/ORC footers) work without modification.
The feature is gated behind {{spark.sql.files.archive.enabled}} (default
{{{}false{}}}).
h2. Integration Points
# {{{}PartitionedFileUtil.splitFiles{}}}: archive paths forced to a single
split.
# {{{}FileScanRDD.readCurrentFile{}}}: archive paths routed through
{{{}ArchiveFormat.readArchive{}}}.
# {{{}DataSource.resolve{}}}: both {{inferSchema}} call sites expand archives
before delegating to the format.
Summary: [SQL] Support reading CSV files inside tar archives (was:
[SQL] Add ArchiveFormat for reading .tar / .tar.gz / .tgz archives as files)
> [SQL] Support reading CSV files inside tar archives
> ---------------------------------------------------
>
> Key: SPARK-57135
> URL: https://issues.apache.org/jira/browse/SPARK-57135
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.3.0
> Reporter: Akshat Shenoi
> Priority: Major
> Labels: pull-request-available
>
> Spark cannot currently read CSV files packaged inside tar archives (.tar,
> .tar.gz, .tgz); users must unpack them externally first.
> This adds opt-in support (spark.sql.files.archive.enabled, default false) for
> reading such archives through the CSV data source by streaming each entry
> through the CSV parser, without materializing entries to local disk:
> * A streaming ArchiveReader opens the tar once and yields one bounded
> InputStream per entry, advancing lazily so memory stays bounded regardless of
> archive size. Directories and dot-prefixed entries are skipped. .tar.gz is
> decompressed via Hadoop's codec factory; .tgz is gunzipped explicitly.
> * CSVFileFormat treats archives as non-splittable (one split per archive) and
> streams each entry through UnivocityParser, handling each entry as a
> standalone CSV file (headers, multiLine, delimiters, column pruning).
> * Schema inference streams entries through the same CSVInferSchema path as a
> multi-file CSV read.
> Scope: CSV only. Streaming supports formats parseable sequentially; formats
> needing random access (Parquet/ORC footers) cannot stream from a tar and are
> out of scope.
> (Supersedes the original proposal, which materialized entries to temp files
> via a format-agnostic ArchiveFormat.)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]