This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new dfdebdd919 GH-38503: [Go][Parquet] Style improvement for using
ArrowColumnWriter (#38581)
dfdebdd919 is described below
commit dfdebdd9199e51b92ed372220b7f33b1aab2d37b
Author: mwish <[email protected]>
AuthorDate: Wed Nov 15 23:38:34 2023 +0800
GH-38503: [Go][Parquet] Style improvement for using ArrowColumnWriter
(#38581)
### Rationale for this change
Currently, `ArrowColumnWriter` seems not having bug. But the usage is
confusing. For nested type, `ArrowColumnWriter` should considering the logic
below:
```
/// 0 foo.bar
/// foo.bar.baz 0
/// foo.bar.baz2 1
/// foo.qux 2
/// 1 foo2 3
/// 2 foo3 4
```
The left column is the column in root of `arrow::Schema`, the parquet
itself only stores Leaf node,
so, the column id for parquet is list at right.
In the `ArrowColumnWriter`, the final argument is the LeafIdx in parquet,
so, writer should considering
using `leafIdx`. Also, it need a `LeafCount` API for getting the leaf-count
here.
### What changes are included in this PR?
Style enhancement for `LeafCount`, `leafIdx` and usage for
`ArrowColumnWriter`
### Are these changes tested?
no
### Are there any user-facing changes?
no
* Closes: #38503
Authored-by: mwish <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/parquet/internal/encoding/levels.go | 12 ++++++------
go/parquet/pqarrow/encode_arrow.go | 10 +++++++---
go/parquet/pqarrow/encode_arrow_test.go | 12 +++++++++---
go/parquet/pqarrow/path_builder.go | 4 ++--
4 files changed, 24 insertions(+), 14 deletions(-)
diff --git a/go/parquet/internal/encoding/levels.go
b/go/parquet/internal/encoding/levels.go
index caf8320593..2a6dc24933 100644
--- a/go/parquet/internal/encoding/levels.go
+++ b/go/parquet/internal/encoding/levels.go
@@ -19,6 +19,7 @@ package encoding
import (
"bytes"
"encoding/binary"
+ "errors"
"fmt"
"math/bits"
@@ -28,7 +29,6 @@ import (
"github.com/apache/arrow/go/v15/parquet"
format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/v15/parquet/internal/utils"
- "golang.org/x/xerrors"
)
// LevelEncoder is for handling the encoding of Definition and Repetition
levels
@@ -194,12 +194,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding,
maxLvl int16, nbuffere
switch encoding {
case parquet.Encodings.RLE:
if len(data) < 4 {
- return 0, xerrors.New("parquet: received invalid levels
(corrupt data page?)")
+ return 0, errors.New("parquet: received invalid levels
(corrupt data page?)")
}
nbytes := int32(binary.LittleEndian.Uint32(data[:4]))
if nbytes < 0 || nbytes > int32(len(data)-4) {
- return 0, xerrors.New("parquet: received invalid number
of bytes (corrupt data page?)")
+ return 0, errors.New("parquet: received invalid number
of bytes (corrupt data page?)")
}
buf := data[4:]
@@ -212,12 +212,12 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding,
maxLvl int16, nbuffere
case parquet.Encodings.BitPacked:
nbits, ok := overflow.Mul(nbuffered, l.bitWidth)
if !ok {
- return 0, xerrors.New("parquet: number of buffered
values too large (corrupt data page?)")
+ return 0, errors.New("parquet: number of buffered
values too large (corrupt data page?)")
}
nbytes := bitutil.BytesForBits(int64(nbits))
if nbytes < 0 || nbytes > int64(len(data)) {
- return 0, xerrors.New("parquet: recieved invalid number
of bytes (corrupt data page?)")
+ return 0, errors.New("parquet: received invalid number
of bytes (corrupt data page?)")
}
if l.bit == nil {
l.bit = utils.NewBitReader(bytes.NewReader(data))
@@ -234,7 +234,7 @@ func (l *LevelDecoder) SetData(encoding parquet.Encoding,
maxLvl int16, nbuffere
// run length encoding.
func (l *LevelDecoder) SetDataV2(nbytes int32, maxLvl int16, nbuffered int,
data []byte) error {
if nbytes < 0 {
- return xerrors.New("parquet: invalid page header (corrupt data
page?)")
+ return errors.New("parquet: invalid page header (corrupt data
page?)")
}
l.maxLvl = maxLvl
diff --git a/go/parquet/pqarrow/encode_arrow.go
b/go/parquet/pqarrow/encode_arrow.go
index 1855d3625a..4989837cd0 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -81,7 +81,7 @@ type ArrowColumnWriter struct {
//
// Using an arrow column writer is a convenience to avoid having to process
the arrow array yourself
// and determine the correct definition and repetition levels manually.
-func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest
*SchemaManifest, rgw file.RowGroupWriter, col int) (ArrowColumnWriter, error) {
+func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest
*SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter,
error) {
if data.Len() == 0 {
return ArrowColumnWriter{leafCount:
calcLeafCount(data.DataType()), rgw: rgw}, nil
}
@@ -118,7 +118,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size
int64, manifest *Sch
// which is the one this instance will start writing for
// colIdx := rgw.CurrentColumn() + 1
- schemaField, err := manifest.GetColumnField(col)
+ schemaField, err := manifest.GetColumnField(leafColIdx)
if err != nil {
return ArrowColumnWriter{}, err
}
@@ -153,7 +153,11 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset,
size int64, manifest *Sch
values += chunkWriteSize
}
- return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw:
rgw, colIdx: col}, nil
+ return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw:
rgw, colIdx: leafColIdx}, nil
+}
+
+func (acw *ArrowColumnWriter) LeafCount() int {
+ return acw.leafCount
}
func (acw *ArrowColumnWriter) Write(ctx context.Context) error {
diff --git a/go/parquet/pqarrow/encode_arrow_test.go
b/go/parquet/pqarrow/encode_arrow_test.go
index d588aff701..712a003c63 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -145,10 +145,12 @@ func TestWriteArrowCols(t *testing.T) {
srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)
+ colIdx := 0
for i := int64(0); i < tbl.NumCols(); i++ {
- acw, err :=
pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(),
manifest, srgw, int(i))
+ acw, err :=
pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(),
manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
+ colIdx = colIdx + acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
@@ -249,10 +251,12 @@ func TestWriteArrowInt96(t *testing.T) {
srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
+ colIdx := 0
for i := int64(0); i < tbl.NumCols(); i++ {
- acw, err :=
pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(),
manifest, srgw, int(i))
+ acw, err :=
pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(),
manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
+ colIdx += acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
@@ -306,11 +310,13 @@ func writeTableToBuffer(t *testing.T, mem
memory.Allocator, tbl arrow.Table, row
for offset < tbl.NumRows() {
sz := utils.Min(rowGroupSize, tbl.NumRows()-offset)
srgw := writer.AppendRowGroup()
+ colIdx := 0
for i := 0; i < int(tbl.NumCols()); i++ {
col := tbl.Column(i)
- acw, err := pqarrow.NewArrowColumnWriter(col.Data(),
offset, sz, manifest, srgw, i)
+ acw, err := pqarrow.NewArrowColumnWriter(col.Data(),
offset, sz, manifest, srgw, colIdx)
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
+ colIdx = colIdx + acw.LeafCount()
}
srgw.Close()
offset += sz
diff --git a/go/parquet/pqarrow/path_builder.go
b/go/parquet/pqarrow/path_builder.go
index 0f1158bd1e..57a077956e 100644
--- a/go/parquet/pqarrow/path_builder.go
+++ b/go/parquet/pqarrow/path_builder.go
@@ -206,7 +206,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange,
ctx *pathWriteCtx) iter
fillRepLevels(int(childRng.size()), n.repLevel, ctx)
// once we've reached this point the following preconditions should
hold:
// 1. there are no more repeated path nodes to deal with
- // 2. all elements in |range| reperesent contiguous elements in the
child
+ // 2. all elements in |range| represent contiguous elements in the child
// array (null values would have shortened the range to ensure all
// remaining list elements are present, though they may be empty)
// 3. no element of range spans a parent list (intermediate list nodes
@@ -225,7 +225,7 @@ func (n *listNode) fillForLast(rng, childRng *elemRange,
ctx *pathWriteCtx) iter
// this is the start of a new list. we can be sure that it only
applies to the
// previous list (and doesn't jump to the start of any list
further up in nesting
- // due to the contraints mentioned earlier)
+ // due to the constraints mentioned earlier)
ctx.AppendRepLevel(n.prevRepLevel)
ctx.AppendRepLevels(int(sizeCheck.size())-1, n.repLevel)
childRng.end = sizeCheck.end