This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 5c55525 feat(arrow/csv): Extend arrow csv writter (#375)
5c55525 is described below
commit 5c55525bbe7fcffe9e29f37c482805f7ded6ca20
Author: Victor Perez <[email protected]>
AuthorDate: Wed May 21 17:03:57 2025 +0200
feat(arrow/csv): Extend arrow csv writter (#375)
### Rationale for this change
I've been looking for a way to convert parquet files to CSV. The arrow
library seems to be doing a good job, until I found out that.
- Not every type was supported
- It was impossible to change how types were formatted
My goal is to generate CSV that can be used with `COPY TO` statement in
postgresql, For example, I needed to make sure binary fields are hex
encoded instead of base64.
After thinking a bit about it, I considered adding a custom type
converted option that will allow any user of the csv.Writer to change
the behavior for specific types. This is just a function that will be
called before the standard mapping to allow any custom logic to handle
the type.
Tests `TestCustomTypeConversion` in this PR show how it can be used.
### What changes are included in this PR?
- Option for csv.Writer to set a `CustomTypeConversion`
- Remove csv.Writer schema type validation. (It will fail on write if
type is not handled)
- Add tests for csv.Writer based on apache/parquet_testing
`alltypes_plain.parquet` and `delta_byte_array.parquet`
- Add test for `CustomTypeConversion`
### Are these changes tested?
Yes
### Are there any user-facing changes?
- Invalid schemas won't fail on csv.Writer creation but on first write.
I believe this is easier as it sets a single source of truth for type
validation.
- if CustomTypeConverstion option is not set, there are no changes in
the behavior.
---
arrow/csv/common.go | 46 ++++++++++-
arrow/csv/reader.go | 3 +-
arrow/csv/transformer.go | 7 ++
arrow/csv/writer.go | 27 +++---
arrow/csv/writer_test.go | 210 +++++++++++++++++++++++++++++++++++++++++++++++
ci/scripts/test.sh | 7 +-
6 files changed, 278 insertions(+), 22 deletions(-)
diff --git a/arrow/csv/common.go b/arrow/csv/common.go
index 02982d4..78283cd 100644
--- a/arrow/csv/common.go
+++ b/arrow/csv/common.go
@@ -237,15 +237,53 @@ func WithStringsReplacer(replacer *strings.Replacer)
Option {
}
}
-func validate(schema *arrow.Schema) {
+// WithCustomTypeConverter allows specifying a custom type converter for the
CSV writer.
+//
+// returns a slice of strings that must match the number of columns in the
output csv.
+// the second return value is a boolean that indicates if the conversion was
handled.
+// if it is set to false, the library will attempt to use default conversion.
+//
+// There are multiple ways to convert arrow types to strings, and depending on
the goal, you may want to use a different one.
+// One clear example is encoding binary types. The default behaviour is to
encode them as base64 strings.
+// If you want to customize this behaviour, you can use this option and use
any other encoding, such as hex.
+//
+// csv.WithCustomTypeConverter(func(typ arrow.DataType, col arrow.Array)
(result []string, handled bool) {
+// // use hex encoding for binary types
+// if typ.ID() == arrow.BINARY {
+// result = make([]string, col.Len())
+// arr := col.(*array.Binary)
+// for i := 0; i < arr.Len(); i++ {
+// if !arr.IsValid(i) {
+// result[i] = "NULL"
+// continue
+// }
+// result[i] = fmt.Sprintf("\\x%x", arr.Value(i))
+// }
+// return result, true
+// }
+// // keep the default behavior for other types
+// return nil, false
+// })
+func WithCustomTypeConverter(converter func(typ arrow.DataType, col
arrow.Array) (result []string, handled bool)) Option {
+ return func(cfg config) {
+ switch cfg := cfg.(type) {
+ case *Writer:
+ cfg.customTypeConverter = converter
+ default:
+ panic(fmt.Errorf("%w: WithCustomTypeConverter only
allowed on csv Writer", arrow.ErrInvalid))
+ }
+ }
+}
+
+func validateRead(schema *arrow.Schema) {
for i, f := range schema.Fields() {
- if !typeSupported(f.Type) {
+ if !readTypeSupported(f.Type) {
panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid
data type %T", i, f.Name, f.Type))
}
}
}
-func typeSupported(dt arrow.DataType) bool {
+func readTypeSupported(dt arrow.DataType) bool {
switch dt := dt.(type) {
case *arrow.BooleanType:
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type,
*arrow.Int64Type:
@@ -258,7 +296,7 @@ func typeSupported(dt arrow.DataType) bool {
case *arrow.MapType:
return false
case arrow.ListLikeType:
- return typeSupported(dt.Elem())
+ return readTypeSupported(dt.Elem())
case *arrow.BinaryType, *arrow.LargeBinaryType,
*arrow.FixedSizeBinaryType:
case arrow.ExtensionType:
case *arrow.NullType:
diff --git a/arrow/csv/reader.go b/arrow/csv/reader.go
index db0f836..284964a 100644
--- a/arrow/csv/reader.go
+++ b/arrow/csv/reader.go
@@ -106,8 +106,7 @@ func NewInferringReader(r io.Reader, opts ...Option)
*Reader {
// NewReader panics if the given schema contains fields that have types that
are not
// primitive types.
func NewReader(r io.Reader, schema *arrow.Schema, opts ...Option) *Reader {
- validate(schema)
-
+ validateRead(schema)
rr := &Reader{
r: csv.NewReader(r),
schema: schema,
diff --git a/arrow/csv/transformer.go b/arrow/csv/transformer.go
index 8ab41ab..0c663c7 100644
--- a/arrow/csv/transformer.go
+++ b/arrow/csv/transformer.go
@@ -30,6 +30,13 @@ import (
)
func (w *Writer) transformColToStringArr(typ arrow.DataType, col arrow.Array,
stringsReplacer func(string) string) []string {
+ if w.customTypeConverter != nil {
+ result, handled := w.customTypeConverter(typ, col)
+ if handled {
+ return result
+ }
+ }
+
res := make([]string, col.Len())
switch typ.(type) {
case *arrow.BooleanType:
diff --git a/arrow/csv/writer.go b/arrow/csv/writer.go
index f0a277a..ddb0be6 100644
--- a/arrow/csv/writer.go
+++ b/arrow/csv/writer.go
@@ -27,13 +27,14 @@ import (
// Writer wraps encoding/csv.Writer and writes arrow.Record based on a schema.
type Writer struct {
- boolFormatter func(bool) string
- header bool
- nullValue string
- stringReplacer func(string) string
- once sync.Once
- schema *arrow.Schema
- w *csv.Writer
+ boolFormatter func(bool) string
+ header bool
+ nullValue string
+ stringReplacer func(string) string
+ customTypeConverter func(typ arrow.DataType, col arrow.Array) (result
[]string, handled bool)
+ once sync.Once
+ schema *arrow.Schema
+ w *csv.Writer
}
// NewWriter returns a writer that writes arrow.Records to the CSV file
@@ -43,14 +44,14 @@ type Writer struct {
// primitive types.
// For BinaryType the writer will use base64 encoding with padding as per
base64.StdEncoding.
func NewWriter(w io.Writer, schema *arrow.Schema, opts ...Option) *Writer {
- validate(schema)
ww := &Writer{
- boolFormatter: strconv.FormatBool, // override
by passing WithBoolWriter() as an option
- nullValue: "NULL", // override
by passing WithNullWriter() as an option
- stringReplacer: func(x string) string { return x }, // override
by passing WithStringsReplacer() as an option
- schema: schema,
- w: csv.NewWriter(w),
+ boolFormatter: strconv.FormatBool, //
override by passing WithBoolWriter() as an option
+ nullValue: "NULL", //
override by passing WithNullWriter() as an option
+ stringReplacer: func(x string) string { return x }, //
override by passing WithStringsReplacer() as an option
+ customTypeConverter: nil, //
override by passing WithCustomTypeConverter() as an option
+ schema: schema,
+ w: csv.NewWriter(w),
}
for _, opt := range opts {
opt(ww)
diff --git a/arrow/csv/writer_test.go b/arrow/csv/writer_test.go
index b15d3b6..ef154c8 100644
--- a/arrow/csv/writer_test.go
+++ b/arrow/csv/writer_test.go
@@ -19,12 +19,16 @@ package csv_test
import (
"bufio"
"bytes"
+ "context"
ecsv "encoding/csv"
"fmt"
"io"
"log"
+ "os"
+ "path"
"strings"
"testing"
+ "time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
@@ -34,7 +38,11 @@ import (
"github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/arrow-go/v18/arrow/float16"
"github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet/file"
+ "github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
const (
@@ -428,3 +436,205 @@ func BenchmarkWrite(b *testing.B) {
}
}
}
+
+// TestParquetTestingCSVWriter tests that the CSV writer successfully convert
arrow/parquet-testing files to CSV
+func TestParquetTestingCSVWriter(t *testing.T) {
+ dir := os.Getenv("PARQUET_TEST_DATA")
+ if dir == "" {
+ dir = "../../parquet-testing/data"
+ t.Log("PARQUET_TEST_DATA not set, using
../../parquet-testing/data")
+ }
+ assert.DirExists(t, dir)
+
+ t.Run("alltypes_plain.parquet", func(t *testing.T) {
+ testFile, err := os.Open(path.Join(dir,
"alltypes_plain.parquet"))
+ require.NoError(t, err)
+ defer testFile.Close()
+
+ r, err := file.NewParquetReader(testFile)
+ require.NoError(t, err)
+ defer r.Close()
+
+ alloc := memory.NewGoAllocator()
+
+ arrowReader, err := pqarrow.NewFileReader(r,
pqarrow.ArrowReadProperties{BatchSize: 1024}, alloc)
+ require.NoError(t, err)
+
+ schema, err := arrowReader.Schema()
+ require.NoError(t, err)
+
+ buf := &bytes.Buffer{}
+ csvWriter := csv.NewWriter(buf, schema, csv.WithHeader(true))
+
+ recordReader, err :=
arrowReader.GetRecordReader(context.Background(), nil, nil)
+ require.NoError(t, err)
+
+ for recordReader.Next() {
+ rec := recordReader.Record()
+ err := csvWriter.Write(rec)
+ require.NoError(t, err)
+ }
+ require.NoError(t, csvWriter.Error())
+ require.NoError(t, csvWriter.Flush())
+
+ expected :=
`id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col
+4,true,0,0,0,0,0,0,MDMvMDEvMDk=,MA==,2009-03-01 00:00:00
+5,false,1,1,1,10,1.1,10.1,MDMvMDEvMDk=,MQ==,2009-03-01 00:01:00
+6,true,0,0,0,0,0,0,MDQvMDEvMDk=,MA==,2009-04-01 00:00:00
+7,false,1,1,1,10,1.1,10.1,MDQvMDEvMDk=,MQ==,2009-04-01 00:01:00
+2,true,0,0,0,0,0,0,MDIvMDEvMDk=,MA==,2009-02-01 00:00:00
+3,false,1,1,1,10,1.1,10.1,MDIvMDEvMDk=,MQ==,2009-02-01 00:01:00
+0,true,0,0,0,0,0,0,MDEvMDEvMDk=,MA==,2009-01-01 00:00:00
+1,false,1,1,1,10,1.1,10.1,MDEvMDEvMDk=,MQ==,2009-01-01 00:01:00
+`
+
+ require.Equal(t, expected, buf.String())
+ })
+ t.Run("delta_byte_array.parquet", func(t *testing.T) {
+ testFile, err := os.Open(path.Join(dir,
"delta_byte_array.parquet"))
+ require.NoError(t, err)
+ defer testFile.Close()
+
+ r, err := file.NewParquetReader(testFile)
+ require.NoError(t, err)
+ defer r.Close()
+
+ alloc := memory.NewGoAllocator()
+
+ arrowReader, err := pqarrow.NewFileReader(r,
pqarrow.ArrowReadProperties{BatchSize: 1024}, alloc)
+ require.NoError(t, err)
+
+ schema, err := arrowReader.Schema()
+ require.NoError(t, err)
+
+ buf := &bytes.Buffer{}
+ csvWriter := csv.NewWriter(
+ buf,
+ schema,
+ csv.WithHeader(true),
+ csv.WithNullWriter(""),
+ )
+
+ recordReader, err :=
arrowReader.GetRecordReader(context.Background(), nil, nil)
+ require.NoError(t, err)
+
+ for recordReader.Next() {
+ rec := recordReader.Record()
+ err := csvWriter.Write(rec)
+ require.NoError(t, err)
+ }
+ require.NoError(t, csvWriter.Error())
+ require.NoError(t, csvWriter.Flush())
+
+ expected, err := os.Open(path.Join(dir,
"delta_byte_array_expect.csv"))
+ require.NoError(t, err)
+ defer expected.Close()
+
+ // parse expected as CSV
+ expectedScanner := ecsv.NewReader(expected)
+ expectedLines, err := expectedScanner.ReadAll()
+ require.NoError(t, err)
+
+ // parse buf as CSV
+ bufLines, err := ecsv.NewReader(buf).ReadAll()
+ require.NoError(t, err)
+
+ // compare line by line
+ require.Equal(t, len(bufLines), len(expectedLines))
+
+ for i, line := range bufLines {
+ require.Equal(t, expectedLines[i], line)
+ }
+ })
+}
+
+// TestParquetTestingCSVWriter tests that the CSV writer successfully convert
arrow/parquet-testing files to CSV
+func TestCustomTypeConversion(t *testing.T) {
+ dir := os.Getenv("PARQUET_TEST_DATA")
+ if dir == "" {
+ dir = "../../parquet-testing/data"
+ t.Log("PARQUET_TEST_DATA not set, using
../../parquet-testing/data")
+ }
+ assert.DirExists(t, dir)
+
+ testFile, err := os.Open(path.Join(dir, "alltypes_plain.parquet"))
+ require.NoError(t, err)
+ defer testFile.Close()
+
+ r, err := file.NewParquetReader(testFile)
+ require.NoError(t, err)
+ defer r.Close()
+
+ alloc := memory.NewGoAllocator()
+
+ arrowReader, err := pqarrow.NewFileReader(r,
pqarrow.ArrowReadProperties{BatchSize: 1024}, alloc)
+ require.NoError(t, err)
+
+ schema, err := arrowReader.Schema()
+ require.NoError(t, err)
+
+ buf := &bytes.Buffer{}
+ csvWriter := csv.NewWriter(
+ buf,
+ schema,
+ csv.WithHeader(true),
+ csv.WithCustomTypeConverter(func(typ arrow.DataType, col
arrow.Array) (result []string, handled bool) {
+ if typ.ID() == arrow.BINARY {
+ result = make([]string, col.Len())
+ arr := col.(*array.Binary)
+ for i := 0; i < arr.Len(); i++ {
+ if !arr.IsValid(i) {
+ result[i] = "NULL"
+ continue
+ }
+ result[i] = fmt.Sprintf("\\x%x",
arr.Value(i))
+ }
+ return result, true
+ }
+ if typ.ID() == arrow.TIMESTAMP {
+ result = make([]string, col.Len())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ if !arr.IsValid(i) {
+ result[i] = "NULL"
+ continue
+ }
+ fn, err :=
typ.(*arrow.TimestampType).GetToTimeFunc()
+ if err != nil {
+ result[i] = "NULL"
+ continue
+ }
+
+ result[i] =
fn(arr.Value(i)).Format(time.RFC3339)
+ }
+ return result, true
+ }
+ return nil, false
+ }),
+ )
+
+ recordReader, err := arrowReader.GetRecordReader(context.Background(),
nil, nil)
+ require.NoError(t, err)
+
+ for recordReader.Next() {
+ rec := recordReader.Record()
+ err := csvWriter.Write(rec)
+ require.NoError(t, err)
+ }
+ require.NoError(t, csvWriter.Error())
+ require.NoError(t, csvWriter.Flush())
+
+ expected :=
`id,bool_col,tinyint_col,smallint_col,int_col,bigint_col,float_col,double_col,date_string_col,string_col,timestamp_col
+4,true,0,0,0,0,0,0,\x30332f30312f3039,\x30,2009-03-01T00:00:00Z
+5,false,1,1,1,10,1.1,10.1,\x30332f30312f3039,\x31,2009-03-01T00:01:00Z
+6,true,0,0,0,0,0,0,\x30342f30312f3039,\x30,2009-04-01T00:00:00Z
+7,false,1,1,1,10,1.1,10.1,\x30342f30312f3039,\x31,2009-04-01T00:01:00Z
+2,true,0,0,0,0,0,0,\x30322f30312f3039,\x30,2009-02-01T00:00:00Z
+3,false,1,1,1,10,1.1,10.1,\x30322f30312f3039,\x31,2009-02-01T00:01:00Z
+0,true,0,0,0,0,0,0,\x30312f30312f3039,\x30,2009-01-01T00:00:00Z
+1,false,1,1,1,10,1.1,10.1,\x30312f30312f3039,\x31,2009-01-01T00:01:00Z
+`
+
+ require.Equal(t, expected, buf.String())
+
+}
diff --git a/ci/scripts/test.sh b/ci/scripts/test.sh
index 25b236c..78260c0 100755
--- a/ci/scripts/test.sh
+++ b/ci/scripts/test.sh
@@ -21,6 +21,10 @@ set -eux
source_dir=${1}
+export PARQUET_TEST_DATA=${1}/parquet-testing/data
+export PARQUET_TEST_BAD_DATA=${1}/parquet-testing/bad_data
+export ARROW_TEST_DATA=${1}/arrow-testing/data
+
case "$(uname)" in
MINGW*)
# -race and -asan don't work on Windows currently
@@ -69,9 +73,6 @@ go test "${test_args[@]}" -tags ${tags},noasm ./...
popd
-export PARQUET_TEST_DATA=${1}/parquet-testing/data
-export PARQUET_TEST_BAD_DATA=${1}/parquet-testing/bad_data
-export ARROW_TEST_DATA=${1}/arrow-testing/data
pushd "${source_dir}/parquet"
go test "${test_args[@]}" -tags assert ./...