This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f2fe6003a4 ARROW-16926: [Go] Fix csv reader errors clobbered by
subsequent reads (#13451)
f2fe6003a4 is described below
commit f2fe6003a452c6168596a0b204fa4035ed3c95aa
Author: WilliamWhispell <[email protected]>
AuthorDate: Wed Jun 29 13:48:58 2022 -0400
ARROW-16926: [Go] Fix csv reader errors clobbered by subsequent reads
(#13451)
Currently you can reproduce this issue by reading a csv file with garbage
string values where float64 are expected. If you place the bad data in the
first part of the file, then subsequent r.r.Read() will clobber the parse err
that was set inside r.read(rec)
So at the bottom of the loop body, r.read(rec) is called, we end up in func
(r *Reader) parseFloat64(field array.Builder, str string)
it encounters an error, and sets err on the reader:
v, err := strconv.ParseFloat(str, 64)
if err != nil && r.err == nil {
r.err = err
field.AppendNull()
return
}
However, when we come back out of the call to the loop, we advance in the
for loop without checking the err and on the subsequent call to r.r.Read() we
clobber the r.err.
This means that if the last chunk has no error, after we read the csv,
calls to r.Err() on the reader will return nil, even though an err took place
during parse.
Authored-by: GREENWICH\wwhispell <[email protected]>
Signed-off-by: Matthew Topol <[email protected]>
---
go/arrow/csv/reader.go | 16 +++++++----
go/arrow/csv/reader_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/go/arrow/csv/reader.go b/go/arrow/csv/reader.go
index 9e270d625f..1278e50347 100644
--- a/go/arrow/csv/reader.go
+++ b/go/arrow/csv/reader.go
@@ -138,7 +138,10 @@ func (r *Reader) Record() arrow.Record { return r.cur }
// Next returns whether a Record could be extracted from the underlying CSV
file.
//
// Next panics if the number of records extracted from a CSV row does not match
-// the number of fields of the associated schema.
+// the number of fields of the associated schema. If a parse failure occurs,
Next
+// will return true and the Record will contain nulls where failures occurred.
+// Subsequent calls to Next will return false - The user should check Err()
after
+// each call to Next to check if an error took place.
func (r *Reader) Next() bool {
if r.header {
r.once.Do(func() {
@@ -209,11 +212,15 @@ func (r *Reader) nextn() bool {
var (
recs []string
n = 0
+ err error
)
for i := 0; i < r.chunk && !r.done; i++ {
- recs, r.err = r.r.Read()
- if r.err != nil {
+ recs, err = r.r.Read()
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ r.err = err
+ }
r.done = true
break
}
@@ -225,9 +232,6 @@ func (r *Reader) nextn() bool {
if r.err != nil {
r.done = true
- if errors.Is(r.err, io.EOF) {
- r.err = nil
- }
}
r.cur = r.bld.NewRecord()
diff --git a/go/arrow/csv/reader_test.go b/go/arrow/csv/reader_test.go
index 6f0316e35a..a4696f13e7 100644
--- a/go/arrow/csv/reader_test.go
+++ b/go/arrow/csv/reader_test.go
@@ -155,6 +155,74 @@ func Example_withChunk() {
// rec[3]["str"]: ["str-9"]
}
+func TestCSVReaderParseError(t *testing.T) {
+ f := bytes.NewBufferString(`## a simple set of data:
int64;float64;string
+0;0;str-0
+1;1;str-1
+2;2;str-2
+3;3;str-3
+4;BADDATA;str-4
+5;5;str-5
+6;6;str-6
+7;7;str-7
+8;8;str-8
+9;9;str-9
+`)
+
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "i64", Type: arrow.PrimitiveTypes.Int64},
+ {Name: "f64", Type: arrow.PrimitiveTypes.Float64},
+ {Name: "str", Type: arrow.BinaryTypes.String},
+ },
+ nil,
+ )
+ r := csv.NewReader(
+ f, schema,
+ csv.WithComment('#'), csv.WithComma(';'),
+ csv.WithChunk(3),
+ )
+ defer r.Release()
+
+ n := 0
+ lines := 0
+ var rec arrow.Record
+ for r.Next() {
+ if rec != nil {
+ rec.Release()
+ }
+ rec = r.Record()
+ rec.Retain()
+
+ if n == 1 && r.Err() == nil {
+ t.Fatal("Expected error on second chunk, but none
found")
+ }
+
+ for i, col := range rec.Columns() {
+ fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i),
col)
+ lines++
+ }
+ n++
+ }
+
+ if r.Err() == nil {
+ t.Fatal("Expected any chunk with error to leave reader in an
error state.")
+ }
+
+ if got, want := n, 2; got != want {
+ t.Fatalf("invalid number of chunks: got=%d, want=%d", got, want)
+ }
+
+ if got, want := lines, 6; got != want {
+ t.Fatalf("invalid number of lines: got=%d, want=%d", got, want)
+ }
+
+ if !rec.Columns()[1].IsNull(1) {
+ t.Fatalf("expected bad data to be null, found: %v",
rec.Columns()[1].Data())
+ }
+ rec.Release()
+}
+
func TestCSVReader(t *testing.T) {
tests := []struct {
Name string