This is an automated email from the ASF dual-hosted git repository.
sbinet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a59d51e ARROW-4734: [Go] Add option to write a header for CSV writer
a59d51e is described below
commit a59d51e13bb45501839669c88c4cfd467f4e15f6
Author: Anson Qian <[email protected]>
AuthorDate: Mon Mar 18 08:22:26 2019 +0100
ARROW-4734: [Go] Add option to write a header for CSV writer
@sbinet
Author: Anson Qian <[email protected]>
Closes #3866 from anson627/ARROW-4734 and squashes the following commits:
233df161 <Anson Qian> Update go.mod and go.sum
5e887cde <Anson Qian> Better error handling
af73b2e6 <Anson Qian> Better error handling
50ec6676 <Anson Qian> Create new schema when read header
66daa0be <Anson Qian> Address code review
e39affec <Anson Qian> Add option for both read and write
5bfbb61d <Anson Qian> ARROW-4734: Add option to write a header for CSV
writer
---
go/arrow/Gopkg.lock | 19 ++++++++-
go/arrow/Gopkg.toml | 4 ++
go/arrow/csv/common.go | 13 ++++++
go/arrow/csv/reader.go | 33 +++++++++++++++
go/arrow/csv/reader_test.go | 87 ++++++++++++++++++++++++++++++++++++++++
go/arrow/csv/testdata/header.csv | 20 +++++++++
go/arrow/csv/writer.go | 24 +++++++++++
go/arrow/csv/writer_test.go | 59 +++++++++++++++++++++++++++
go/arrow/go.mod | 1 +
go/arrow/go.sum | 2 +
10 files changed, 261 insertions(+), 1 deletion(-)
diff --git a/go/arrow/Gopkg.lock b/go/arrow/Gopkg.lock
index 30a0d82..143e4f9 100644
--- a/go/arrow/Gopkg.lock
+++ b/go/arrow/Gopkg.lock
@@ -2,26 +2,43 @@
[[projects]]
+ digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b"
name = "github.com/davecgh/go-spew"
packages = ["spew"]
+ pruneopts = ""
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
+ digest = "1:1d7e1867c49a6dd9856598ef7c3123604ea3daabf5b83f303ff457bcbc410b1d"
+ name = "github.com/pkg/errors"
+ packages = ["."]
+ pruneopts = ""
+ revision = "ba968bfe8b2f7e042a574c888954fccecfa385b4"
+ version = "v0.8.1"
+
+[[projects]]
+ digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411"
name = "github.com/pmezard/go-difflib"
packages = ["difflib"]
+ pruneopts = ""
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
+ digest = "1:2d0dc026c4aef5e2f3a0e06a4dabe268b840d8f63190cf6894e02134a03f52c5"
name = "github.com/stretchr/testify"
packages = ["assert"]
+ pruneopts = ""
revision = "b91bfb9ebec76498946beb6af7c0230c7cc7ba6c"
version = "v1.2.0"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
- inputs-digest =
"a9dd4821c4522b2069722c1be94b191e7aff1736aeb5c12ab0070f87cdbc5af2"
+ input-imports = [
+ "github.com/pkg/errors",
+ "github.com/stretchr/testify/assert",
+ ]
solver-name = "gps-cdcl"
solver-version = 1
diff --git a/go/arrow/Gopkg.toml b/go/arrow/Gopkg.toml
index 03bbeba..b27807d 100644
--- a/go/arrow/Gopkg.toml
+++ b/go/arrow/Gopkg.toml
@@ -17,3 +17,7 @@
[[constraint]]
name = "github.com/stretchr/testify"
version = "1.2.0"
+
+[[constraint]]
+ name = "github.com/pkg/errors"
+ version = "0.8.1"
\ No newline at end of file
diff --git a/go/arrow/csv/common.go b/go/arrow/csv/common.go
index baa3edd..0f9ef70 100644
--- a/go/arrow/csv/common.go
+++ b/go/arrow/csv/common.go
@@ -104,6 +104,19 @@ func WithCRLF(useCRLF bool) Option {
}
}
+func WithHeader() Option {
+ return func(cfg config) {
+ switch cfg := cfg.(type) {
+ case *Reader:
+ cfg.header = true
+ case *Writer:
+ cfg.header = true
+ default:
+ panic(fmt.Errorf("arrow/csv: unknown config type %T",
cfg))
+ }
+ }
+}
+
func validate(schema *arrow.Schema) {
for i, f := range schema.Fields() {
switch ft := f.Type.(type) {
diff --git a/go/arrow/csv/reader.go b/go/arrow/csv/reader.go
index c54beb7..5053a14 100644
--- a/go/arrow/csv/reader.go
+++ b/go/arrow/csv/reader.go
@@ -20,12 +20,14 @@ import (
"encoding/csv"
"io"
"strconv"
+ "sync"
"sync/atomic"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/internal/debug"
"github.com/apache/arrow/go/arrow/memory"
+ "github.com/pkg/errors"
)
// Reader wraps encoding/csv.Reader and creates array.Records from a schema.
@@ -43,6 +45,9 @@ type Reader struct {
next func() bool
mem memory.Allocator
+
+ header bool
+ once sync.Once
}
// NewReader returns a reader that reads from the CSV file and creates
@@ -76,6 +81,28 @@ func NewReader(r io.Reader, schema *arrow.Schema, opts
...Option) *Reader {
return rr
}
+func (r *Reader) readHeader() error {
+ records, err := r.r.Read()
+ if err != nil {
+ return errors.Wrapf(err, "arrow/csv: could not read header from
file")
+ }
+
+ if len(records) != len(r.schema.Fields()) {
+ return ErrMismatchFields
+ }
+
+ fields := make([]arrow.Field, len(records))
+ for idx, name := range records {
+ fields[idx] = r.schema.Field(idx)
+ fields[idx].Name = name
+ }
+
+ meta := r.schema.Metadata()
+ r.schema = arrow.NewSchema(fields, &meta)
+ r.bld = array.NewRecordBuilder(r.mem, r.schema)
+ return nil
+}
+
// Err returns the last error encountered during the iteration over the
// underlying CSV file.
func (r *Reader) Err() error { return r.err }
@@ -92,6 +119,12 @@ func (r *Reader) Record() array.Record { return r.cur }
// Next panics if the number of records extracted from a CSV row does not match
// the number of fields of the associated schema.
func (r *Reader) Next() bool {
+ if r.header {
+ r.once.Do(func() {
+ r.err = r.readHeader()
+ })
+ }
+
if r.cur != nil {
r.cur.Release()
r.cur = nil
diff --git a/go/arrow/csv/reader_test.go b/go/arrow/csv/reader_test.go
index 35bc4bb..18edfbe 100644
--- a/go/arrow/csv/reader_test.go
+++ b/go/arrow/csv/reader_test.go
@@ -249,6 +249,93 @@ rec[1]["str"]: ["str-2"]
}
}
+func TestCSVReaderWithHeader(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ raw, err := ioutil.ReadFile("testdata/header.csv")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ arrow.Field{Name: "0", Type:
arrow.FixedWidthTypes.Boolean},
+ arrow.Field{Name: "1", Type: arrow.PrimitiveTypes.Int8},
+ arrow.Field{Name: "2", Type:
arrow.PrimitiveTypes.Int16},
+ arrow.Field{Name: "3", Type:
arrow.PrimitiveTypes.Int32},
+ arrow.Field{Name: "4", Type:
arrow.PrimitiveTypes.Int64},
+ arrow.Field{Name: "5", Type:
arrow.PrimitiveTypes.Uint8},
+ arrow.Field{Name: "6", Type:
arrow.PrimitiveTypes.Uint16},
+ arrow.Field{Name: "7", Type:
arrow.PrimitiveTypes.Uint32},
+ arrow.Field{Name: "8", Type:
arrow.PrimitiveTypes.Uint64},
+ arrow.Field{Name: "9", Type:
arrow.PrimitiveTypes.Float32},
+ arrow.Field{Name: "10", Type:
arrow.PrimitiveTypes.Float64},
+ arrow.Field{Name: "11", Type: arrow.BinaryTypes.String},
+ },
+ nil,
+ )
+
+ r := csv.NewReader(bytes.NewReader(raw), schema,
+ csv.WithAllocator(mem),
+ csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithHeader(),
+ )
+ defer r.Release()
+
+ r.Retain()
+ r.Release()
+
+ out := new(bytes.Buffer)
+
+ n := 0
+ for r.Next() {
+ rec := r.Record()
+ for i, col := range rec.Columns() {
+ fmt.Fprintf(out, "rec[%d][%q]: %v\n", n,
rec.ColumnName(i), col)
+ }
+ n++
+ }
+
+ if got, want := n, 2; got != want {
+ t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
+ }
+
+ want := `rec[0]["bool"]: [true]
+rec[0]["i8"]: [-1]
+rec[0]["i16"]: [-1]
+rec[0]["i32"]: [-1]
+rec[0]["i64"]: [-1]
+rec[0]["u8"]: [1]
+rec[0]["u16"]: [1]
+rec[0]["u32"]: [1]
+rec[0]["u64"]: [1]
+rec[0]["f32"]: [1.1]
+rec[0]["f64"]: [1.1]
+rec[0]["str"]: ["str-1"]
+rec[1]["bool"]: [false]
+rec[1]["i8"]: [-2]
+rec[1]["i16"]: [-2]
+rec[1]["i32"]: [-2]
+rec[1]["i64"]: [-2]
+rec[1]["u8"]: [2]
+rec[1]["u16"]: [2]
+rec[1]["u32"]: [2]
+rec[1]["u64"]: [2]
+rec[1]["f32"]: [2.2]
+rec[1]["f64"]: [2.2]
+rec[1]["str"]: ["str-2"]
+`
+
+ if got, want := out.String(), want; got != want {
+ t.Fatalf("invalid output:\ngot= %s\nwant=%s\n", got, want)
+ }
+
+ if r.Err() != nil {
+ t.Fatalf("unexpected error: %v", r.Err())
+ }
+}
+
func TestCSVReaderWithChunk(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
diff --git a/go/arrow/csv/testdata/header.csv b/go/arrow/csv/testdata/header.csv
new file mode 100644
index 0000000..08ab715
--- /dev/null
+++ b/go/arrow/csv/testdata/header.csv
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+bool;i8;i16;i32;i64;u8;u16;u32;u64;f32;f64;str
+true;-1;-1;-1;-1;1;1;1;1;1.1;1.1;str-1
+false;-2;-2;-2;-2;2;2;2;2;2.2;2.2;str-2
diff --git a/go/arrow/csv/writer.go b/go/arrow/csv/writer.go
index c42635f..0cfacda 100644
--- a/go/arrow/csv/writer.go
+++ b/go/arrow/csv/writer.go
@@ -20,6 +20,7 @@ import (
"encoding/csv"
"io"
"strconv"
+ "sync"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
@@ -29,6 +30,8 @@ import (
type Writer struct {
w *csv.Writer
schema *arrow.Schema
+ header bool
+ once sync.Once
}
// NewWriter returns a writer that writes array.Records to the CSV file
@@ -55,6 +58,16 @@ func (w *Writer) Write(record array.Record) error {
return ErrMismatchFields
}
+ var err error
+ if w.header {
+ w.once.Do(func() {
+ err = w.writeHeader()
+ })
+ if err != nil {
+ return err
+ }
+ }
+
recs := make([][]string, record.NumRows())
for i := range recs {
recs[i] = make([]string, record.NumCols())
@@ -139,3 +152,14 @@ func (w *Writer) Flush() error {
func (w *Writer) Error() error {
return w.w.Error()
}
+
+func (w *Writer) writeHeader() error {
+ headers := make([]string, len(w.schema.Fields()))
+ for i := range headers {
+ headers[i] = w.schema.Field(i).Name
+ }
+ if err := w.w.Write(headers); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/go/arrow/csv/writer_test.go b/go/arrow/csv/writer_test.go
index 5aa2b6f..9ced865 100644
--- a/go/arrow/csv/writer_test.go
+++ b/go/arrow/csv/writer_test.go
@@ -182,6 +182,65 @@ true;1;1;1;1;2;2;2;2;0.2;0.2;str-2
}
}
+func TestCSVWriterWithHeader(t *testing.T) {
+ f := new(bytes.Buffer)
+
+ pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer pool.AssertSize(t, 0)
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "bool", Type: arrow.FixedWidthTypes.Boolean},
+ {Name: "i8", Type: arrow.PrimitiveTypes.Int8},
+ {Name: "i16", Type: arrow.PrimitiveTypes.Int16},
+ {Name: "i32", Type: arrow.PrimitiveTypes.Int32},
+ {Name: "i64", Type: arrow.PrimitiveTypes.Int64},
+ {Name: "u8", Type: arrow.PrimitiveTypes.Uint8},
+ {Name: "u16", Type: arrow.PrimitiveTypes.Uint16},
+ {Name: "u32", Type: arrow.PrimitiveTypes.Uint32},
+ {Name: "u64", Type: arrow.PrimitiveTypes.Uint64},
+ {Name: "f32", Type: arrow.PrimitiveTypes.Float32},
+ {Name: "f64", Type: arrow.PrimitiveTypes.Float64},
+ {Name: "str", Type: arrow.BinaryTypes.String},
+ },
+ nil,
+ )
+
+ b := array.NewRecordBuilder(pool, schema)
+ defer b.Release()
+
+ b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{true, false,
true}, nil)
+ b.Field(1).(*array.Int8Builder).AppendValues([]int8{-1, 0, 1}, nil)
+ b.Field(2).(*array.Int16Builder).AppendValues([]int16{-1, 0, 1}, nil)
+ b.Field(3).(*array.Int32Builder).AppendValues([]int32{-1, 0, 1}, nil)
+ b.Field(4).(*array.Int64Builder).AppendValues([]int64{-1, 0, 1}, nil)
+ b.Field(5).(*array.Uint8Builder).AppendValues([]uint8{0, 1, 2}, nil)
+ b.Field(6).(*array.Uint16Builder).AppendValues([]uint16{0, 1, 2}, nil)
+ b.Field(7).(*array.Uint32Builder).AppendValues([]uint32{0, 1, 2}, nil)
+ b.Field(8).(*array.Uint64Builder).AppendValues([]uint64{0, 1, 2}, nil)
+ b.Field(9).(*array.Float32Builder).AppendValues([]float32{0.0, 0.1,
0.2}, nil)
+ b.Field(10).(*array.Float64Builder).AppendValues([]float64{0.0, 0.1,
0.2}, nil)
+ b.Field(11).(*array.StringBuilder).AppendValues([]string{"str-0",
"str-1", "str-2"}, nil)
+
+ rec := b.NewRecord()
+ defer rec.Release()
+
+ w := csv.NewWriter(f, schema, csv.WithComma(';'), csv.WithCRLF(false),
csv.WithHeader())
+ err := w.Write(rec)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ want := `bool;i8;i16;i32;i64;u8;u16;u32;u64;f32;f64;str
+true;-1;-1;-1;-1;0;0;0;0;0;0;str-0
+false;0;0;0;0;1;1;1;1;0.1;0.1;str-1
+true;1;1;1;1;2;2;2;2;0.2;0.2;str-2
+`
+
+ if got, want := f.String(), want; strings.Compare(got, want) != 0 {
+ t.Fatalf("invalid output:\ngot=%s\nwant=%s\n", got, want)
+ }
+}
+
func BenchmarkWrite(b *testing.B) {
pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer pool.AssertSize(b, 0)
diff --git a/go/arrow/go.mod b/go/arrow/go.mod
index 6d4c592..451f68e 100644
--- a/go/arrow/go.mod
+++ b/go/arrow/go.mod
@@ -18,6 +18,7 @@ module github.com/apache/arrow/go/arrow
require (
github.com/davecgh/go-spew v1.1.0 // indirect
+ github.com/pkg/errors v0.8.1
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.2.0
)
diff --git a/go/arrow/go.sum b/go/arrow/go.sum
index cad042e..2639967 100644
--- a/go/arrow/go.sum
+++ b/go/arrow/go.sum
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.0
h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.0
h1:LThGCOvhuJic9Gyd1VBCkhyUXmO8vKaBFvBsJ2k03rg=