pitrou commented on code in PR #38367:
URL: https://github.com/apache/arrow/pull/38367#discussion_r1368652890
##########
go/parquet/internal/encoding/boolean_encoder.go:
##########
@@ -87,3 +89,53 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer,
error) {
return enc.sink.Finish(), nil
}
+
+type RleBooleanEncoder struct {
+ encoder
+
+ bufferedValues []bool
+}
+
+func (RleBooleanEncoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (enc *RleBooleanEncoder) Put(in []bool) {
+ enc.bufferedValues = append(enc.bufferedValues, in...)
+}
+
+func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte,
validBitsOffset int64) {
+ bufferOut := make([]bool, len(in))
+ nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
+ enc.Put(bufferOut[:nvalid])
+}
+
+func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
+ const rleLengthInBytes = 4
+ return rleLengthInBytes + int64(enc.maxRleBufferSize())
+}
+
+func (enc *RleBooleanEncoder) maxRleBufferSize() int {
+ return utils.MaxBufferSize(1, len(enc.bufferedValues))
+}
+
+func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
+ const rleLengthInBytes = 4
+ rleBufferSizeMax := enc.maxRleBufferSize()
+ enc.sink.SetOffset(rleLengthInBytes)
+ enc.sink.Reserve(rleBufferSizeMax)
+
+ encoder := utils.NewRleEncoder(enc.sink, 1)
+ for _, v := range enc.bufferedValues {
+ if v {
+ encoder.Put(1)
+ } else {
+ encoder.Put(0)
+ }
+ }
+ n := encoder.Flush()
Review Comment:
Should we assert that n is <= rleBufferSizeMax?
##########
go/parquet/internal/encoding/encoding_test.go:
##########
@@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
b.checkRoundTrip(parquet.Encodings.Plain)
}
+func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
+ switch b.typ {
+ case reflect.TypeOf(true):
+ b.initData(2000, 200)
Review Comment:
So... it looks like this will call `InitValues` which in turn calls
`fillRandomIsValid` with a `pctNull` of 1.0, which will generate only 0s. Am I
wrong?
##########
go/parquet/internal/encoding/boolean_decoder.go:
##########
@@ -109,3 +114,76 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool,
nullCount int, validBit
}
return dec.Decode(out)
}
+
+type RleBooleanDecoder struct {
+ decoder
+
+ rleDec *utils.RleDecoder
+}
+
+func (RleBooleanDecoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
+ dec.nvals = nvals
+
+ if len(data) < 4 {
+ return fmt.Errorf("invalid length - %d (corrupt data page?)",
len(data))
+ }
+
+ // load the first 4 bytes in little-endian which indicates the length
+ nbytes := binary.LittleEndian.Uint32(data[:4])
+ if nbytes > uint32(len(data)-4) {
+ return fmt.Errorf("received invalid number of bytes - %d
(corrupt data page?)", nbytes)
+ }
+
+ dec.data = data[4:]
+ if dec.rleDec == nil {
+ dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
+ } else {
+ dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
+ }
+ return nil
+}
+
+func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
+ max := shared_utils.MinInt(len(out), dec.nvals)
+
+ var (
+ buf [1024]uint64
+ n = max
+ )
+
+ for n > 0 {
+ batch := shared_utils.MinInt(len(buf), n)
+ decoded := dec.rleDec.GetBatch(buf[:batch])
+ if decoded != batch {
+ return max - n, io.ErrUnexpectedEOF
Review Comment:
Is there a point in returning `max - n` here? What is the caller supposed to
do with it?
(note you aren't actually writing out the decoded values, so I assume the
decoded bytes are lost, which means the decoder can't be used anymore
afterwards)
##########
go/parquet/internal/encoding/boolean_decoder.go:
##########
@@ -109,3 +114,76 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool,
nullCount int, validBit
}
return dec.Decode(out)
}
+
+type RleBooleanDecoder struct {
+ decoder
+
+ rleDec *utils.RleDecoder
+}
+
+func (RleBooleanDecoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
+ dec.nvals = nvals
+
+ if len(data) < 4 {
+ return fmt.Errorf("invalid length - %d (corrupt data page?)",
len(data))
+ }
+
+ // load the first 4 bytes in little-endian which indicates the length
+ nbytes := binary.LittleEndian.Uint32(data[:4])
+ if nbytes > uint32(len(data)-4) {
+ return fmt.Errorf("received invalid number of bytes - %d
(corrupt data page?)", nbytes)
+ }
+
+ dec.data = data[4:]
+ if dec.rleDec == nil {
+ dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
+ } else {
+ dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
+ }
+ return nil
+}
+
+func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
+ max := shared_utils.MinInt(len(out), dec.nvals)
+
+ var (
+ buf [1024]uint64
+ n = max
Review Comment:
I'm curious, what is the rationale for using `var` vs. `:=`? Is there a
well-known guideline for this?
##########
go/parquet/internal/encoding/boolean_encoder.go:
##########
@@ -87,3 +89,53 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer,
error) {
return enc.sink.Finish(), nil
}
+
+type RleBooleanEncoder struct {
+ encoder
+
+ bufferedValues []bool
+}
+
+func (RleBooleanEncoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (enc *RleBooleanEncoder) Put(in []bool) {
+ enc.bufferedValues = append(enc.bufferedValues, in...)
+}
+
+func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte,
validBitsOffset int64) {
+ bufferOut := make([]bool, len(in))
+ nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
+ enc.Put(bufferOut[:nvalid])
+}
+
+func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
+ const rleLengthInBytes = 4
+ return rleLengthInBytes + int64(enc.maxRleBufferSize())
+}
+
+func (enc *RleBooleanEncoder) maxRleBufferSize() int {
+ return utils.MaxBufferSize(1, len(enc.bufferedValues))
Review Comment:
I don't understand this. Aren't there any situations where RLE encoding
might significantly increase the size?
(is this in bytes or bits?)
##########
go/parquet/internal/encoding/boolean_encoder.go:
##########
@@ -87,3 +89,53 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer,
error) {
return enc.sink.Finish(), nil
}
+
+type RleBooleanEncoder struct {
+ encoder
+
+ bufferedValues []bool
+}
+
+func (RleBooleanEncoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (enc *RleBooleanEncoder) Put(in []bool) {
+ enc.bufferedValues = append(enc.bufferedValues, in...)
+}
+
+func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte,
validBitsOffset int64) {
+ bufferOut := make([]bool, len(in))
+ nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
+ enc.Put(bufferOut[:nvalid])
+}
+
+func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
+ const rleLengthInBytes = 4
+ return rleLengthInBytes + int64(enc.maxRleBufferSize())
+}
+
+func (enc *RleBooleanEncoder) maxRleBufferSize() int {
+ return utils.MaxBufferSize(1, len(enc.bufferedValues))
+}
+
+func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
+ const rleLengthInBytes = 4
+ rleBufferSizeMax := enc.maxRleBufferSize()
+ enc.sink.SetOffset(rleLengthInBytes)
+ enc.sink.Reserve(rleBufferSizeMax)
+
+ encoder := utils.NewRleEncoder(enc.sink, 1)
+ for _, v := range enc.bufferedValues {
+ if v {
+ encoder.Put(1)
Review Comment:
Is the RLE encoder thoroughly tested somewhere else?
##########
go/parquet/internal/encoding/boolean_encoder.go:
##########
@@ -87,3 +89,53 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer,
error) {
return enc.sink.Finish(), nil
}
+
+type RleBooleanEncoder struct {
+ encoder
+
+ bufferedValues []bool
+}
+
+func (RleBooleanEncoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (enc *RleBooleanEncoder) Put(in []bool) {
+ enc.bufferedValues = append(enc.bufferedValues, in...)
+}
+
+func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte,
validBitsOffset int64) {
+ bufferOut := make([]bool, len(in))
+ nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
+ enc.Put(bufferOut[:nvalid])
+}
+
+func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
+ const rleLengthInBytes = 4
+ return rleLengthInBytes + int64(enc.maxRleBufferSize())
+}
+
+func (enc *RleBooleanEncoder) maxRleBufferSize() int {
+ return utils.MaxBufferSize(1, len(enc.bufferedValues))
+}
+
+func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
+ const rleLengthInBytes = 4
+ rleBufferSizeMax := enc.maxRleBufferSize()
+ enc.sink.SetOffset(rleLengthInBytes)
+ enc.sink.Reserve(rleBufferSizeMax)
+
+ encoder := utils.NewRleEncoder(enc.sink, 1)
Review Comment:
Can we avoid ambiguous variable names (`enc` vs `encoder`)?
##########
go/parquet/internal/encoding/encoding_test.go:
##########
@@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
b.checkRoundTrip(parquet.Encodings.Plain)
}
+func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
+ switch b.typ {
+ case reflect.TypeOf(true):
+ b.initData(2000, 200)
Review Comment:
In any case, given the specificities of RLE encoding, this should use a
couple different value probabilities, for example {0, 0.5, 0.9, 0.99}.
##########
go/parquet/internal/encoding/boolean_decoder.go:
##########
@@ -109,3 +114,76 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool,
nullCount int, validBit
}
return dec.Decode(out)
}
+
+type RleBooleanDecoder struct {
+ decoder
+
+ rleDec *utils.RleDecoder
+}
+
+func (RleBooleanDecoder) Type() parquet.Type {
+ return parquet.Types.Boolean
+}
+
+func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
+ dec.nvals = nvals
+
+ if len(data) < 4 {
+ return fmt.Errorf("invalid length - %d (corrupt data page?)",
len(data))
+ }
+
+ // load the first 4 bytes in little-endian which indicates the length
+ nbytes := binary.LittleEndian.Uint32(data[:4])
+ if nbytes > uint32(len(data)-4) {
+ return fmt.Errorf("received invalid number of bytes - %d
(corrupt data page?)", nbytes)
+ }
+
+ dec.data = data[4:]
+ if dec.rleDec == nil {
+ dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
+ } else {
+ dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
+ }
+ return nil
+}
+
+func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
+ max := shared_utils.MinInt(len(out), dec.nvals)
+
+ var (
+ buf [1024]uint64
+ n = max
+ )
+
+ for n > 0 {
+ batch := shared_utils.MinInt(len(buf), n)
+ decoded := dec.rleDec.GetBatch(buf[:batch])
+ if decoded != batch {
+ return max - n, io.ErrUnexpectedEOF
+ }
+
+ for i := 0; i < batch; i++ {
+ out[i] = buf[i] != 0
+ }
+ n -= batch
+ out = out[batch:]
+ }
+
+ dec.nvals -= max
+ return max, nil
+}
+
+func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int,
validBits []byte, validBitsOffset int64) (int, error) {
+ if nullCount > 0 {
+ toRead := len(out) - nullCount
+ valuesRead, err := dec.Decode(out[:toRead])
+ if err != nil {
+ return 0, err
+ }
+ if valuesRead != toRead {
+ return valuesRead, xerrors.New("parquet: rle boolean
decoder: number of values / definition levels read did not match")
Review Comment:
Hmm, what is the rationale for using `xerrors` here rather than the go
stdlib as in other places?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]