This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c55525  feat(arrow/csv): Extend arrow csv writter (#375)
5c55525 is described below

commit 5c55525bbe7fcffe9e29f37c482805f7ded6ca20
Author: Victor Perez <[email protected]>
AuthorDate: Wed May 21 17:03:57 2025 +0200

    feat(arrow/csv): Extend arrow csv writter (#375)
    
    ### Rationale for this change
    
    I've been looking for a way to convert parquet files to CSV. The arrow
    library seems to be doing a good job, until I found out that.
    
    - Not every type was supported
    - It was impossible to change how types were formatted
    
    My goal is to generate CSV that can be used with `COPY TO` statement in
    postgresql, For example, I needed to make sure binary fields are hex
    encoded instead of base64.
    
    After thinking a bit about it, I considered adding a custom type
    converted option that will allow any user of the csv.Writer to change
    the behavior for specific types. This is just a function that will be
    called before the standard mapping to allow any custom logic to handle
    the type.
    
    Tests `TestCustomTypeConversion` in this PR show how it can be used.
    
    ### What changes are included in this PR?
    
    - Option for csv.Writer to set a `CustomTypeConversion`
    - Remove csv.Writer schema type validation. (It will fail on write if
    type is not handled)
    - Add tests for csv.Writer based on apache/parquet_testing
    `alltypes_plain.parquet` and `delta_byte_array.parquet`
    - Add test for `CustomTypeConversion`
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    - Invalid schemas won't fail on csv.Writer creation but on first write.
    I believe this is easier as it sets a single source of truth for type
    validation.
    - if CustomTypeConverstion option is not set, there are no changes in
    the behavior.
---
 arrow/csv/common.go      |  46 ++++++++++-
 arrow/csv/reader.go      |   3 +-
 arrow/csv/transformer.go |   7 ++
 arrow/csv/writer.go      |  27 +++---
 arrow/csv/writer_test.go | 210 +++++++++++++++++++++++++++++++++++++++++++++++
 ci/scripts/test.sh       |   7 +-
 6 files changed, 278 insertions(+), 22 deletions(-)

diff --git a/arrow/csv/common.go b/arrow/csv/common.go
index 02982d4..78283cd 100644
--- a/arrow/csv/common.go
+++ b/arrow/csv/common.go
@@ -237,15 +237,53 @@ func WithStringsReplacer(replacer *strings.Replacer) 
Option {
        }
 }
 
-func validate(schema *arrow.Schema) {
+// WithCustomTypeConverter allows specifying a custom type converter for the 
CSV writer.
+//
+// returns a slice of strings that must match the number of columns in the 
output csv.
+// the second return value is a boolean that indicates if the conversion was 
handled.
+// if it is set to false, the library will attempt to use default conversion.
+//
+// There are multiple ways to convert arrow types to strings, and depending on 
the goal, you may want to use a different one.
+// One clear example is encoding binary types. The default behaviour is to 
encode them as base64 strings.
+// If you want to customize this behaviour, you can use this option and use 
any other encoding, such as hex.
+//
+//     csv.WithCustomTypeConverter(func(typ arrow.DataType, col arrow.Array) 
(result []string, handled bool) {
+//             // use hex encoding for binary types
+//             if typ.ID() == arrow.BINARY {
+//                     result = make([]string, col.Len())
+//                     arr := col.(*array.Binary)
+//                     for i := 0; i < arr.Len(); i++ {
+//                             if !arr.IsValid(i) {
+//                                     result[i] = "NULL"
+//                                     continue
+//                             }
+//                             result[i] = fmt.Sprintf("\\x%x", arr.Value(i))
+//                     }
+//                     return result, true
+//             }
+//             // keep the default behavior for other types
+//             return nil, false
+//     })
+func WithCustomTypeConverter(converter func(typ arrow.DataType, col 
arrow.Array) (result []string, handled bool)) Option {
+       return func(cfg config) {
+               switch cfg := cfg.(type) {
+               case *Writer:
+                       cfg.customTypeConverter = converter
+               default:
+                       panic(fmt.Errorf("%w: WithCustomTypeConverter only 
allowed on csv Writer", arrow.ErrInvalid))
+               }
+       }
+}
+
+func validateRead(schema *arrow.Schema) {
        for i, f := range schema.Fields() {
-               if !typeSupported(f.Type) {
+               if !readTypeSupported(f.Type) {
                        panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid 
data type %T", i, f.Name, f.Type))
                }
        }
 }
 
-func typeSupported(dt arrow.DataType) bool {
+func readTypeSupported(dt arrow.DataType) bool {
        switch dt := dt.(type) {
        case *arrow.BooleanType:
        case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, 
*arrow.Int64Type:
@@ -258,7 +296,7 @@ func typeSupported(dt arrow.DataType) bool {
        case *arrow.MapType:
                return false
        case arrow.ListLikeType:
-               return typeSupported(dt.Elem())
+               return readTypeSupported(dt.Elem())
        case *arrow.BinaryType, *arrow.LargeBinaryType, 
*arrow.FixedSizeBinaryType:
        case arrow.ExtensionType:
        case *arrow.NullType:
diff --git a/arrow/csv/reader.go b/arrow/csv/reader.go
index db0f836..284964a 100644
--- a/arrow/csv/reader.go
+++ b/arrow/csv/reader.go
@@ -106,8 +106,7 @@ func NewInferringReader(r io.Reader, opts ...Option) 
*Reader {
 // NewReader panics if the given schema contains fields that have types that 
are not
 // primitive types.
 func NewReader(r io.Reader, schema *arrow.Schema, opts ...Option) *Reader {
-       validate(schema)
-
+       validateRead(schema)
        rr := &Reader{
                r:                csv.NewReader(r),
                schema:           schema,
diff --git a/arrow/csv/transformer.go b/arrow/csv/transformer.go
index 8ab41ab..0c663c7 100644
--- a/arrow/csv/transformer.go
+++ b/arrow/csv/transformer.go
@@ -30,6 +30,13 @@ import (
 )
 
 func (w *Writer) transformColToStringArr(typ arrow.DataType, col arrow.Array, 
stringsReplacer func(string) string) []string {
+       if w.customTypeConverter != nil {
+               result, handled := w.customTypeConverter(typ, col)
+               if handled {
+                       return result
+               }
+       }
+
        res := make([]string, col.Len())
        switch typ.(type) {
        case *arrow.BooleanType:
diff --git a/arrow/csv/writer.go b/arrow/csv/writer.go
index f0a277a..ddb0be6 100644
--- a/arrow/csv/writer.go
+++ b/arrow/csv/writer.go
@@ -27,13 +27,14 @@ import (
 
 // Writer wraps encoding/csv.Writer and writes arrow.Record based on a schema.
 type Writer struct {
-       boolFormatter  func(bool) string
-       header         bool
-       nullValue      string
-       stringReplacer func(string) string
-       once           sync.Once
-       schema         *arrow.Schema
-       w              *csv.Writer
+       boolFormatter       func(bool) string
+       header              bool
+       nullValue           string
+       stringReplacer      func(string) string
+       customTypeConverter func(typ arrow.DataType, col arrow.Array) (result 
[]string, handled bool)
+       once                sync.Once
+       schema              *arrow.Schema
+       w                   *csv.Writer
 }
 
 // NewWriter returns a writer that writes arrow.Records to the CSV file
@@ -43,14 +44,14 @@ type Writer struct {
 // primitive types.
 // For BinaryType the writer will use base64 encoding with padding as per 
base64.StdEncoding.
 func NewWriter(w io.Writer, schema *arrow.Schema, opts ...Option) *Writer {
-       validate(schema)
 
        ww := &Writer{
-               boolFormatter:  strconv.FormatBool,                 // override 
by passing WithBoolWriter() as an option
-               nullValue:      "NULL",                             // override 
by passing WithNullWriter() as an option
-               stringReplacer: func(x string) string { return x }, // override 
by passing WithStringsReplacer() as an option
-               schema:         schema,
-               w:              csv.NewWriter(w),
+               boolFormatter:       strconv.FormatBool,                 // 
override by passing WithBoolWriter() as an option
+               nullValue:           "NULL",                             // 
override by passing WithNullWriter() as an option
+               stringReplacer:      func(x string) string { return x }, // 
override by passing WithStringsReplacer() as an option
+               customTypeConverter: nil,                                // 
override by passing WithCustomTypeConverter() as an option
+               schema:              schema,
+               w:                   csv.NewWriter(w),
        }
        for _, opt := range opts {
                opt(ww)
diff --git a/arrow/csv/writer_test.go b/arrow/csv/writer_test.go
index b15d3b6..ef154c8 100644
--- a/arrow/csv/writer_test.go
+++ b/arrow/csv/writer_test.go
@@ -19,12 +19,16 @@ package csv_test
 import (
        "bufio"
        "bytes"
+       "context"
        ecsv "encoding/csv"
        "fmt"
        "io"
        "log"
+       "os"
+       "path"
        "strings"
        "testing"
+       "time"
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
@@ -34,7 +38,11 @@ import (
        "github.com/apache/arrow-go/v18/arrow/extensions"
        "github.com/apache/arrow-go/v18/arrow/float16"
        "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/file"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
        "github.com/google/uuid"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
 const (
@@ -428,3 +436,205 @@ func BenchmarkWrite(b *testing.B) {
                }
        }
 }
+
+// TestParquetTestingCSVWriter tests that the CSV writer successfully convert 
arrow/parquet-testing files to CSV
+func TestParquetTestingCSVWriter(t *testing.T) {
+       dir := os.Getenv("PARQUET_TEST_DATA")
+       if dir == "" {
+               dir = "../../parquet-testing/data"
+               t.Log("PARQUET_TEST_DATA not set, using 
../../parquet-testing/data")
+       }
+       assert.DirExists(t, dir)
+
+       t.Run("alltypes_plain.parquet", func(t *testing.T) {
+               testFile, err := os.Open(path.Join(dir, 
"alltypes_plain.parquet"))
+               require.NoError(t, err)
+               defer testFile.Close()
+
+               r, err := file.NewParquetReader(testFile)
+               require.NoError(t, err)
+               defer r.Close()
+
+               alloc := memory.NewGoAllocator()
+
+               arrowReader, err := pqarrow.NewFileReader(r, 
pqarrow.ArrowReadProperties{BatchSize: 1024}, alloc)
+               require.NoError(t, err)
+
+               schema, err := arrowReader.Schema()
+               require.NoError(t, err)
+
+               buf := &bytes.Buffer{}
+               csvWriter := csv.NewWriter(buf, schema, csv.WithHeader(true))
+
+               recordReader, err := 
arrowReader.GetRecordReader(context.Background(), nil, nil)
+               require.NoError(t, err)
+
+               for recordReader.Next() {
+                       rec := recordReader.Record()
+                       err := csvWriter.Write(rec)
+                       require.NoError(t, err)
+               }
+               require.NoError(t, csvWriter.Error())
+               require.NoError(t, csvWriter.Flush())
+
+               expected := 
`id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col
+4,true,0,0,0,0,0,0,MDMvMDEvMDk=,MA==,2009-03-01 00:00:00
+5,false,1,1,1,10,1.1,10.1,MDMvMDEvMDk=,MQ==,2009-03-01 00:01:00
+6,true,0,0,0,0,0,0,MDQvMDEvMDk=,MA==,2009-04-01 00:00:00
+7,false,1,1,1,10,1.1,10.1,MDQvMDEvMDk=,MQ==,2009-04-01 00:01:00
+2,true,0,0,0,0,0,0,MDIvMDEvMDk=,MA==,2009-02-01 00:00:00
+3,false,1,1,1,10,1.1,10.1,MDIvMDEvMDk=,MQ==,2009-02-01 00:01:00
+0,true,0,0,0,0,0,0,MDEvMDEvMDk=,MA==,2009-01-01 00:00:00
+1,false,1,1,1,10,1.1,10.1,MDEvMDEvMDk=,MQ==,2009-01-01 00:01:00
+`
+
+               require.Equal(t, expected, buf.String())
+       })
+       t.Run("delta_byte_array.parquet", func(t *testing.T) {
+               testFile, err := os.Open(path.Join(dir, 
"delta_byte_array.parquet"))
+               require.NoError(t, err)
+               defer testFile.Close()
+
+               r, err := file.NewParquetReader(testFile)
+               require.NoError(t, err)
+               defer r.Close()
+
+               alloc := memory.NewGoAllocator()
+
+               arrowReader, err := pqarrow.NewFileReader(r, 
pqarrow.ArrowReadProperties{BatchSize: 1024}, alloc)
+               require.NoError(t, err)
+
+               schema, err := arrowReader.Schema()
+               require.NoError(t, err)
+
+               buf := &bytes.Buffer{}
+               csvWriter := csv.NewWriter(
+                       buf,
+                       schema,
+                       csv.WithHeader(true),
+                       csv.WithNullWriter(""),
+               )
+
+               recordReader, err := 
arrowReader.GetRecordReader(context.Background(), nil, nil)
+               require.NoError(t, err)
+
+               for recordReader.Next() {
+                       rec := recordReader.Record()
+                       err := csvWriter.Write(rec)
+                       require.NoError(t, err)
+               }
+               require.NoError(t, csvWriter.Error())
+               require.NoError(t, csvWriter.Flush())
+
+               expected, err := os.Open(path.Join(dir, 
"delta_byte_array_expect.csv"))
+               require.NoError(t, err)
+               defer expected.Close()
+
+               // parse expected as CSV
+               expectedScanner := ecsv.NewReader(expected)
+               expectedLines, err := expectedScanner.ReadAll()
+               require.NoError(t, err)
+
+               // parse buf as CSV
+               bufLines, err := ecsv.NewReader(buf).ReadAll()
+               require.NoError(t, err)
+
+               // compare line by line
+               require.Equal(t, len(bufLines), len(expectedLines))
+
+               for i, line := range bufLines {
+                       require.Equal(t, expectedLines[i], line)
+               }
+       })
+}
+
+// TestParquetTestingCSVWriter tests that the CSV writer successfully convert 
arrow/parquet-testing files to CSV
+func TestCustomTypeConversion(t *testing.T) {
+       dir := os.Getenv("PARQUET_TEST_DATA")
+       if dir == "" {
+               dir = "../../parquet-testing/data"
+               t.Log("PARQUET_TEST_DATA not set, using 
../../parquet-testing/data")
+       }
+       assert.DirExists(t, dir)
+
+       testFile, err := os.Open(path.Join(dir, "alltypes_plain.parquet"))
+       require.NoError(t, err)
+       defer testFile.Close()
+
+       r, err := file.NewParquetReader(testFile)
+       require.NoError(t, err)
+       defer r.Close()
+
+       alloc := memory.NewGoAllocator()
+
+       arrowReader, err := pqarrow.NewFileReader(r, 
pqarrow.ArrowReadProperties{BatchSize: 1024}, alloc)
+       require.NoError(t, err)
+
+       schema, err := arrowReader.Schema()
+       require.NoError(t, err)
+
+       buf := &bytes.Buffer{}
+       csvWriter := csv.NewWriter(
+               buf,
+               schema,
+               csv.WithHeader(true),
+               csv.WithCustomTypeConverter(func(typ arrow.DataType, col 
arrow.Array) (result []string, handled bool) {
+                       if typ.ID() == arrow.BINARY {
+                               result = make([]string, col.Len())
+                               arr := col.(*array.Binary)
+                               for i := 0; i < arr.Len(); i++ {
+                                       if !arr.IsValid(i) {
+                                               result[i] = "NULL"
+                                               continue
+                                       }
+                                       result[i] = fmt.Sprintf("\\x%x", 
arr.Value(i))
+                               }
+                               return result, true
+                       }
+                       if typ.ID() == arrow.TIMESTAMP {
+                               result = make([]string, col.Len())
+                               arr := col.(*array.Timestamp)
+                               for i := 0; i < arr.Len(); i++ {
+                                       if !arr.IsValid(i) {
+                                               result[i] = "NULL"
+                                               continue
+                                       }
+                                       fn, err := 
typ.(*arrow.TimestampType).GetToTimeFunc()
+                                       if err != nil {
+                                               result[i] = "NULL"
+                                               continue
+                                       }
+
+                                       result[i] = 
fn(arr.Value(i)).Format(time.RFC3339)
+                               }
+                               return result, true
+                       }
+                       return nil, false
+               }),
+       )
+
+       recordReader, err := arrowReader.GetRecordReader(context.Background(), 
nil, nil)
+       require.NoError(t, err)
+
+       for recordReader.Next() {
+               rec := recordReader.Record()
+               err := csvWriter.Write(rec)
+               require.NoError(t, err)
+       }
+       require.NoError(t, csvWriter.Error())
+       require.NoError(t, csvWriter.Flush())
+
+       expected := 
`id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col
+4,true,0,0,0,0,0,0,\x30332f30312f3039,\x30,2009-03-01T00:00:00Z
+5,false,1,1,1,10,1.1,10.1,\x30332f30312f3039,\x31,2009-03-01T00:01:00Z
+6,true,0,0,0,0,0,0,\x30342f30312f3039,\x30,2009-04-01T00:00:00Z
+7,false,1,1,1,10,1.1,10.1,\x30342f30312f3039,\x31,2009-04-01T00:01:00Z
+2,true,0,0,0,0,0,0,\x30322f30312f3039,\x30,2009-02-01T00:00:00Z
+3,false,1,1,1,10,1.1,10.1,\x30322f30312f3039,\x31,2009-02-01T00:01:00Z
+0,true,0,0,0,0,0,0,\x30312f30312f3039,\x30,2009-01-01T00:00:00Z
+1,false,1,1,1,10,1.1,10.1,\x30312f30312f3039,\x31,2009-01-01T00:01:00Z
+`
+
+       require.Equal(t, expected, buf.String())
+
+}
diff --git a/ci/scripts/test.sh b/ci/scripts/test.sh
index 25b236c..78260c0 100755
--- a/ci/scripts/test.sh
+++ b/ci/scripts/test.sh
@@ -21,6 +21,10 @@ set -eux
 
 source_dir=${1}
 
+export PARQUET_TEST_DATA=${1}/parquet-testing/data
+export PARQUET_TEST_BAD_DATA=${1}/parquet-testing/bad_data
+export ARROW_TEST_DATA=${1}/arrow-testing/data
+
 case "$(uname)" in
 MINGW*)
   # -race and -asan don't work on Windows currently
@@ -69,9 +73,6 @@ go test "${test_args[@]}" -tags ${tags},noasm ./...
 
 popd
 
-export PARQUET_TEST_DATA=${1}/parquet-testing/data
-export PARQUET_TEST_BAD_DATA=${1}/parquet-testing/bad_data
-export ARROW_TEST_DATA=${1}/arrow-testing/data
 pushd "${source_dir}/parquet"
 
 go test "${test_args[@]}" -tags assert ./...

Reply via email to