This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 62ecae86 fix(parquet): return error instead of panicking on 
first-write failure (#824)
62ecae86 is described below

commit 62ecae86fea101501bf47ab9068443dc5a4319f8
Author: Matt Topol <[email protected]>
AuthorDate: Thu May 28 14:02:55 2026 -0400

    fix(parquet): return error instead of panicking on first-write failure 
(#824)
    
    ### Rationale for this change
    
    Closes #820.
    
    `(*file.Writer).startFile()` panics with the literal string `"failed to
    write magic number"` when the underlying sink's first `Write` call
    returns an error or short-writes. Because `file.NewParquetWriter` calls
    `startFile` synchronously inside the constructor and has no `error`
    return, callers had no idiomatic Go recovery path. The higher-level
    `pqarrow.NewFileWriter` already returns `(*FileWriter, error)`, but the
    panic propagated through it unchanged, so its `error` return was never
    reached for this failure mode.
    
    In production this manifests when the sink is a network-attached writer
    (`*storage.Writer` from GCS, an S3 multipart upload, an HDFS client,
    etc.). A transient network blip on the first 4-byte magic-header write
    reliably crashes the worker process, orphans any in-flight upload
    session, and forces a container restart.
    
    ### What changes are included in this PR?
    
    1. **`(*Writer).startFile()` now returns `error`** instead of panicking
    on sink-write failures. The configuration-validation panic for
    "encrypted column not found in file schema" is preserved — that path is
    a programmer error, not an I/O failure.
    2. **New public constructor `file.NewParquetWriterWithError(w, sc,
    opts...) (*Writer, error)`** is the preferred entry point for callers
    whose sink may transiently fail.
    3. **`file.NewParquetWriter` is now a thin wrapper** over
    `NewParquetWriterWithError` that panics with the *exact same string*
    (`"failed to write magic number"`) on init failure. Any consumer with a
    `recover()` block that string-matches the panic value continues to work
    unchanged.
    4. **`pqarrow.NewFileWriter` switches to `NewParquetWriterWithError`**
    internally. Its public signature is unchanged; its `error` return now
    meaningfully covers transient sink failures, with no API change for any
    of its callers.
    
    ### Are these changes tested?
    
    Yes. Six new tests in `parquet/file/file_writer_test.go` exercise the
    new behavior using a `flakyMagicSink` test helper modeled on the
    existing `errCloseWriter` pattern in the same file:
    
    - `TestNewParquetWriterWithError_Success` — happy path on a
    `bytes.Buffer`.
    - `TestNewParquetWriterWithError_FirstWriteFails` — first `Write`
    returns `io.ErrUnexpectedEOF`; constructor returns a wrapped error and
    does not panic.
    - `TestNewParquetWriterWithError_FirstWriteShortWrites` — first `Write`
    returns `n=len(p)-1, nil`; constructor returns an error mentioning the
    short write.
    - `TestNewParquetWriter_PreservesPanicMessage` — the legacy constructor
    still panics, the panic value is still a `string`, and the message is
    still the literal `"failed to write magic number"` (back-compat guard).
    - `TestPqarrowNewFileWriter_PropagatesInitError` —
    `pqarrow.NewFileWriter` returns the wrapped sink error rather than
    panicking.
    - `TestPqarrowNewFileWriter_PropagatesShortWrite` — same for the
    short-write case.
    
    The existing `TestCloseError`, full `parquet/file/...` and
    `parquet/pqarrow/...` test suites all pass (with `PARQUET_TEST_DATA`
    pointed at `parquet-testing/data`).
    
    ### Are there any user-facing changes?
    
    **No breaking changes.**
    
    - `file.NewParquetWriter(w, sc, opts...) *Writer` — signature unchanged,
    still panics on first-write failure with the identical literal string
    `"failed to write magic number"`. Existing `recover()` workarounds keep
    working.
    - `file.NewParquetWriterWithError(w, sc, opts...) (*Writer, error)` —
    new exported symbol. Adding an exported function is non-breaking in Go.
    - `pqarrow.NewFileWriter(...) (*FileWriter, error)` — signature
    unchanged. Its `error` return now also covers the magic-header-write
    failure that previously escaped as a panic. Callers who already check
    `err` get strictly better behavior.
    
    The new constructor's docstring is the recommended path for users on
    cloud-storage upload sinks. The legacy constructor's docstring now
    documents the panic behavior the original issue called out as
    undocumented.
    
    ### Out of scope (separate follow-up)
    
    While investigating, I noticed `WriteBatchSpaced` in
    `parquet/file/column_writer_types.gen.go` lacks the `defer recover()`
    that `WriteBatch` has, so an I/O error during a spaced write panics out
    of the calling goroutine instead of returning an error. Same class of
    bug as #820 but a different code path; happy to file a separate issue.
---
 parquet/file/file_writer.go      | 45 +++++++++++++++---
 parquet/file/file_writer_test.go | 98 ++++++++++++++++++++++++++++++++++++++++
 parquet/pqarrow/file_writer.go   |  5 +-
 3 files changed, 141 insertions(+), 7 deletions(-)

diff --git a/parquet/file/file_writer.go b/parquet/file/file_writer.go
index 5456e7f9..291f2335 100644
--- a/parquet/file/file_writer.go
+++ b/parquet/file/file_writer.go
@@ -65,11 +65,38 @@ func WithWriteMetadata(meta metadata.KeyValueMetadata) 
WriteOption {
        }
 }
 
-// NewParquetWriter returns a Writer that writes to the provided WriteSeeker 
with the given schema.
+// NewParquetWriter returns a Writer that writes to the provided io.Writer 
with the given schema.
 //
 // If props is nil, then the default Writer Properties will be used. If the 
key value metadata is not nil,
 // it will be added to the file.
+//
+// This constructor panics with the literal string "failed to write magic
+// number" if the initial write of the parquet magic header to the
+// underlying sink fails. The behavior is preserved for backward
+// compatibility with callers that string-match the panic value in a
+// recover() block; new code should prefer [NewParquetWriterWithError],
+// which returns the failure as an error instead.
 func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) 
*Writer {
+       fw, err := NewParquetWriterWithError(w, sc, opts...)
+       if err != nil {
+               // Preserve the historical panic value verbatim so any consumer
+               // performing a string-match in a recover() block continues to 
work.
+               panic("failed to write magic number")
+       }
+       return fw
+}
+
+// NewParquetWriterWithError returns a Writer that writes to the provided
+// io.Writer with the given schema.
+//
+// If props is nil, then the default Writer Properties will be used. If the
+// key value metadata is not nil, it will be added to the file.
+//
+// An error is returned if the initial write of the parquet magic header to
+// the underlying sink fails or short-writes, which can happen when the
+// sink is a flaky or network-attached writer (for example, a cloud-storage
+// upload writer).
+func NewParquetWriterWithError(w io.Writer, sc *schema.GroupNode, opts 
...WriteOption) (*Writer, error) {
        config := &writerConfig{}
        for _, o := range opts {
                o(config)
@@ -87,8 +114,10 @@ func NewParquetWriter(w io.Writer, sc *schema.GroupNode, 
opts ...WriteOption) *W
        }
 
        fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, 
config.keyValueMetadata)
-       fw.startFile()
-       return fw
+       if err := fw.startFile(); err != nil {
+               return nil, err
+       }
+       return fw, nil
 }
 
 // NumColumns returns the number of columns to write as defined by the schema.
@@ -167,7 +196,7 @@ func (fw *Writer) appendRowGroup(buffered bool) 
*rowGroupWriter {
        return fw.rowGroupWriter
 }
 
-func (fw *Writer) startFile() {
+func (fw *Writer) startFile() error {
        encryptionProps := fw.props.FileEncryptionProperties()
        magic := magicBytes
        if encryptionProps != nil {
@@ -199,8 +228,11 @@ func (fw *Writer) startFile() {
        }
 
        n, err := fw.sink.Write(magic)
-       if n != 4 || err != nil {
-               panic("failed to write magic number")
+       if err != nil {
+               return fmt.Errorf("parquet: failed to write magic number: %w", 
err)
+       }
+       if n != len(magic) {
+               return fmt.Errorf("parquet: short write of magic number: wrote 
%d of %d bytes", n, len(magic))
        }
 
        if fw.props.PageIndexEnabled() {
@@ -209,6 +241,7 @@ func (fw *Writer) startFile() {
                        Encryptor: fw.fileEncryptor,
                }
        }
+       return nil
 }
 
 func (fw *Writer) writePageIndex() {
diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go
index 7a6c6ae1..88d82d73 100644
--- a/parquet/file/file_writer_test.go
+++ b/parquet/file/file_writer_test.go
@@ -18,7 +18,9 @@ package file_test
 
 import (
        "bytes"
+       "errors"
        "fmt"
+       "io"
        "math"
        "reflect"
        "slices"
@@ -1324,3 +1326,99 @@ func TestBufferedStreamDictionaryCompressed(t 
*testing.T) {
                assert.Equal(t, int32(i), readValues[i])
        }
 }
+
+// flakyMagicSink models a transient I/O failure on the first Write call,
+// the failure mode reported in apache/arrow-go#820 for cloud-storage upload
+// writers. Set firstErr to fail with that error on the first Write, or set
+// short to true to return n=len(p)-1, nil on the first Write. All subsequent
+// writes succeed and forward to the embedded buffer.
+type flakyMagicSink struct {
+       buf      bytes.Buffer
+       writes   int
+       firstErr error
+       short    bool
+}
+
+func (f *flakyMagicSink) Write(p []byte) (int, error) {
+       f.writes++
+       if f.writes == 1 {
+               if f.firstErr != nil {
+                       return 0, f.firstErr
+               }
+               if f.short {
+                       return len(p) - 1, nil
+               }
+       }
+       return f.buf.Write(p)
+}
+
+func newSingleColumnSchema(t *testing.T) *schema.GroupNode {
+       t.Helper()
+       fields := schema.FieldList{schema.NewInt32Node("col", 
parquet.Repetitions.Required, 1)}
+       sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, 
fields, 0)
+       require.NoError(t, err)
+       return sc
+}
+
+func TestNewParquetWriterWithError_Success(t *testing.T) {
+       var buf bytes.Buffer
+       writer, err := file.NewParquetWriterWithError(&buf, 
newSingleColumnSchema(t))
+       require.NoError(t, err)
+       require.NotNil(t, writer)
+       require.NoError(t, writer.Close())
+}
+
+func TestNewParquetWriterWithError_FirstWriteFails(t *testing.T) {
+       sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}
+       writer, err := file.NewParquetWriterWithError(sink, 
newSingleColumnSchema(t))
+       require.Error(t, err)
+       require.Nil(t, writer)
+       require.True(t, errors.Is(err, io.ErrUnexpectedEOF),
+               "expected returned error to wrap io.ErrUnexpectedEOF, got %v", 
err)
+}
+
+func TestNewParquetWriterWithError_FirstWriteShortWrites(t *testing.T) {
+       sink := &flakyMagicSink{short: true}
+       writer, err := file.NewParquetWriterWithError(sink, 
newSingleColumnSchema(t))
+       require.Error(t, err)
+       require.Nil(t, writer)
+       require.Contains(t, err.Error(), "short write of magic number")
+}
+
+func TestNewParquetWriter_PreservesPanicMessage(t *testing.T) {
+       sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}
+
+       defer func() {
+               r := recover()
+               require.NotNil(t, r, "NewParquetWriter should panic on 
first-write failure")
+               msg, ok := r.(string)
+               require.True(t, ok, "panic value should remain a string for 
back-compat (got %T)", r)
+               require.Equal(t, "failed to write magic number", msg)
+       }()
+
+       _ = file.NewParquetWriter(sink, newSingleColumnSchema(t))
+       t.Fatalf("expected NewParquetWriter to panic, but it returned normally")
+}
+
+func TestPqarrowNewFileWriter_PropagatesInitError(t *testing.T) {
+       arrowSchema := arrow.NewSchema([]arrow.Field{{Name: "f", Type: 
arrow.PrimitiveTypes.Int32}}, nil)
+       sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}
+
+       writer, err := pqarrow.NewFileWriter(arrowSchema, sink,
+               parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
+       require.Error(t, err)
+       require.Nil(t, writer)
+       require.True(t, errors.Is(err, io.ErrUnexpectedEOF),
+               "expected returned error to wrap io.ErrUnexpectedEOF, got %v", 
err)
+}
+
+func TestPqarrowNewFileWriter_PropagatesShortWrite(t *testing.T) {
+       arrowSchema := arrow.NewSchema([]arrow.Field{{Name: "f", Type: 
arrow.PrimitiveTypes.Int32}}, nil)
+       sink := &flakyMagicSink{short: true}
+
+       writer, err := pqarrow.NewFileWriter(arrowSchema, sink,
+               parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
+       require.Error(t, err)
+       require.Nil(t, writer)
+       require.Contains(t, err.Error(), "short write of magic number")
+}
diff --git a/parquet/pqarrow/file_writer.go b/parquet/pqarrow/file_writer.go
index eb576e74..442ec8b4 100644
--- a/parquet/pqarrow/file_writer.go
+++ b/parquet/pqarrow/file_writer.go
@@ -163,7 +163,10 @@ func NewFileWriter(arrschema *arrow.Schema, w io.Writer, 
props *parquet.WriterPr
        }
 
        schemaNode := pqschema.Root()
-       baseWriter := file.NewParquetWriter(w, schemaNode, 
file.WithWriterProps(props), file.WithWriteMetadata(meta))
+       baseWriter, err := file.NewParquetWriterWithError(w, schemaNode, 
file.WithWriterProps(props), file.WithWriteMetadata(meta))
+       if err != nil {
+               return nil, err
+       }
 
        manifest, err := NewSchemaManifest(pqschema, nil, 
&ArrowReadProperties{})
        if err != nil {

Reply via email to