joellubi commented on code in PR #43066: URL: https://github.com/apache/arrow/pull/43066#discussion_r1669391258
########## go/parquet/internal/encoding/byte_stream_split.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +import ( + "fmt" + "math" + + "github.com/apache/arrow/go/v17/arrow" + "github.com/apache/arrow/go/v17/arrow/memory" + "github.com/apache/arrow/go/v17/parquet" + "golang.org/x/xerrors" +) + +// encodeByteStreamSplit encodes the raw bytes provided by 'in' into the output buffer 'data' using BYTE_STREAM_SPLIT encoding. +// 'data' must have space for at least len(in) bytes. +func encodeByteStreamSplit(data []byte, in []byte, width int) { + numElements := len(in) / width + for stream := 0; stream < width; stream++ { + for element := 0; element < numElements; element++ { + encLoc := numElements*stream + element + decLoc := width*element + stream + data[encLoc] = in[decLoc] + } + } +} + +// encodeByteStreamSplitWidth2 implements encodeByteStreamSplit optimized for types stored using 2 bytes. +// 'data' must have space for at least len(in) bytes. +func encodeByteStreamSplitWidth2(data []byte, in []byte) { + const width = 2 + numElements := len(in) / width + for element := 0; element < numElements; element++ { + decLoc := width * element + data[element] = in[decLoc] + data[numElements+element] = in[decLoc+1] + } +} + +// encodeByteStreamSplitWidth4 implements encodeByteStreamSplit optimized for types stored using 4 bytes. +// 'data' must have space for at least len(in) bytes. +func encodeByteStreamSplitWidth4(data []byte, in []byte) { + const width = 4 + numElements := len(in) / width + for element := 0; element < numElements; element++ { + decLoc := width * element + data[element] = in[decLoc] + data[numElements+element] = in[decLoc+1] + data[numElements*2+element] = in[decLoc+2] + data[numElements*3+element] = in[decLoc+3] + } +} + +// encodeByteStreamSplitWidth8 implements encodeByteStreamSplit optimized for types stored using 8 bytes. +// 'data' must have space for at least len(in) bytes. +func encodeByteStreamSplitWidth8(data []byte, in []byte) { + const width = 8 + numElements := len(in) / width + for element := 0; element < numElements; element++ { + decLoc := width * element + data[element] = in[decLoc] + data[numElements+element] = in[decLoc+1] + data[numElements*2+element] = in[decLoc+2] + data[numElements*3+element] = in[decLoc+3] + data[numElements*4+element] = in[decLoc+4] + data[numElements*5+element] = in[decLoc+5] + data[numElements*6+element] = in[decLoc+6] + data[numElements*7+element] = in[decLoc+7] + } +} + +// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data', +// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out []byte) { + const width = 4 + for element := 0; element < nValues; element++ { + out[width*element] = data[element] + out[width*element+1] = data[stride+element] + out[width*element+2] = data[2*stride+element] + out[width*element+3] = data[3*stride+element] + } +} + +// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data', +// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least len(data) bytes. +func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out []byte) { + const width = 8 + for element := 0; element < nValues; element++ { + out[width*element] = data[element] + out[width*element+1] = data[stride+element] + out[width*element+2] = data[2*stride+element] + out[width*element+3] = data[3*stride+element] + out[width*element+4] = data[4*stride+element] + out[width*element+5] = data[5*stride+element] + out[width*element+6] = data[6*stride+element] + out[width*element+7] = data[7*stride+element] + } +} + +// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int, out []parquet.FixedLenByteArray) { + for stream := 0; stream < width; stream++ { + for element := 0; element < nValues; element++ { + encLoc := stride*stream + element + out[element][stream] = data[encLoc] + } + } +} + +// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues FixedLenByteArrays of length 2 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + for element := 0; element < nValues; element++ { + out[element][0] = data[element] + out[element][1] = data[stride+element] + } +} + +// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues FixedLenByteArrays of length 4 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + for element := 0; element < nValues; element++ { + out[element][0] = data[element] + out[element][1] = data[stride+element] + out[element][2] = data[stride*2+element] + out[element][3] = data[stride*3+element] + } +} + +// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues FixedLenByteArrays of length 8 provided by 'data', +// into the output slice 'out' using BYTE_STREAM_SPLIT encoding. +// 'out' must have space for at least nValues slices. +func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) { + for element := 0; element < nValues; element++ { + out[element][0] = data[element] + out[element][1] = data[stride+element] + out[element][2] = data[stride*2+element] + out[element][3] = data[stride*3+element] + out[element][4] = data[stride*4+element] + out[element][5] = data[stride*5+element] + out[element][6] = data[stride*6+element] + out[element][7] = data[stride*7+element] + } +} + +func releaseBufferToPool(pooled *PooledBufferWriter) { + buf := pooled.buf + memory.Set(buf.Buf(), 0) + buf.ResizeNoShrink(0) + bufferPool.Put(buf) +} + +func validateByteStreamSplitPageData(typeLen, nvals int, data []byte) (int, error) { + if nvals*typeLen < len(data) { + return 0, fmt.Errorf("data size (%d) is too small for the number of values in in BYTE_STREAM_SPLIT (%d)", len(data), nvals) + } + + if len(data)%typeLen != 0 { + return 0, fmt.Errorf("ByteStreamSplit data size %d not aligned with byte_width: %d", len(data), typeLen) + } + + return len(data) / typeLen, nil +} + +// ByteStreamSplitFloat32Encoder writes the underlying bytes of the Float32 +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitFloat32Encoder struct { + PlainFloat32Encoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitFloat32Encoder) FlushValues() (Buffer, error) { + in, err := enc.PlainFloat32Encoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.Resize(in.Len()) + encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes()) + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitFloat32Encoder) Release() { + enc.PlainFloat32Encoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + +// ByteStreamSplitFloat64Encoder writes the underlying bytes of the Float64 +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitFloat64Encoder struct { + PlainFloat64Encoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitFloat64Encoder) FlushValues() (Buffer, error) { + in, err := enc.PlainFloat64Encoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.Resize(in.Len()) + encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes()) + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitFloat64Encoder) Release() { + enc.PlainFloat64Encoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + +// ByteStreamSplitInt32Encoder writes the underlying bytes of the Int32 +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitInt32Encoder struct { + PlainInt32Encoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitInt32Encoder) FlushValues() (Buffer, error) { + in, err := enc.PlainInt32Encoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.Resize(in.Len()) + encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes()) + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitInt32Encoder) Release() { + enc.PlainInt32Encoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + +// ByteStreamSplitInt64Encoder writes the underlying bytes of the Int64 +// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding +type ByteStreamSplitInt64Encoder struct { + PlainInt64Encoder + flushBuffer *PooledBufferWriter +} + +func (enc *ByteStreamSplitInt64Encoder) FlushValues() (Buffer, error) { + in, err := enc.PlainInt64Encoder.FlushValues() + if err != nil { + return nil, err + } + + if enc.flushBuffer == nil { + enc.flushBuffer = NewPooledBufferWriter(in.Len()) + } + + enc.flushBuffer.buf.Resize(in.Len()) + encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes()) + return enc.flushBuffer.Finish(), nil +} + +func (enc *ByteStreamSplitInt64Encoder) Release() { + enc.PlainInt64Encoder.Release() + releaseBufferToPool(enc.flushBuffer) + enc.flushBuffer = nil +} + +type ByteStreamSplitFloat32Decoder = ByteStreamSplitDecoder[float32] +type ByteStreamSplitFloat64Decoder = ByteStreamSplitDecoder[float64] +type ByteStreamSplitInt32Decoder = ByteStreamSplitDecoder[int32] +type ByteStreamSplitInt64Decoder = ByteStreamSplitDecoder[int64] Review Comment: Just added the godoc comments. I did like how the generic decoders came out and looked at what it would take to do the same for encoders. It's a little tricker with the encoders because they all embed their respective "Plain" encoders. It's awkward to make this generic at the moment because the Plain encoders are not generic themselves. I think this all gets a lot simpler if/when the overall refactor of parquet to use generics is done, since then the `ByteStreamSplitEncoder[T]` could just embed `PlainEncoder[T]` once it exists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
