zeroshade commented on code in PR #43066:
URL: https://github.com/apache/arrow/pull/43066#discussion_r1669291529
##########
go/parquet/internal/encoding/encoding_benchmarks_test.go:
##########
@@ -464,3 +464,323 @@ func BenchmarkDecodeDictByteArray(b *testing.B) {
dictDec.Decode(out)
}
}
+
+func BenchmarkByteStreamSplitEncodingInt32(b *testing.B) {
+ for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 {
+ b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) {
+ values := make([]int32, sz)
+ for idx := range values {
+ values[idx] = 64
+ }
+ encoder := encoding.NewEncoder(parquet.Types.Int32,
parquet.Encodings.ByteStreamSplit,
+ false, nil,
memory.DefaultAllocator).(encoding.Int32Encoder)
+ b.ResetTimer()
+ b.SetBytes(int64(len(values) * arrow.Int32SizeBytes))
+ for n := 0; n < b.N; n++ {
+ encoder.Put(values)
+ buf, _ := encoder.FlushValues()
+ buf.Release()
+ }
+ })
+ }
+}
+
+func BenchmarkByteStreamSplitDecodingInt32(b *testing.B) {
+ for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 {
+ b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) {
+ output := make([]int32, sz)
+ values := make([]int32, sz)
+ for idx := range values {
+ values[idx] = 64
+ }
+ encoder := encoding.NewEncoder(parquet.Types.Int32,
parquet.Encodings.ByteStreamSplit,
+ false, nil,
memory.DefaultAllocator).(encoding.Int32Encoder)
+ encoder.Put(values)
+ buf, _ := encoder.FlushValues()
+ defer buf.Release()
+
+ decoder := encoding.NewDecoder(parquet.Types.Int32,
parquet.Encodings.ByteStreamSplit, nil, memory.DefaultAllocator)
+ b.ResetTimer()
+ b.SetBytes(int64(len(values) * arrow.Int32SizeBytes))
+ for n := 0; n < b.N; n++ {
+ decoder.SetData(sz, buf.Bytes())
+ decoder.(encoding.Int32Decoder).Decode(output)
+ }
+ })
+ }
+}
+
+func BenchmarkByteStreamSplitDecodingInt32Batched(b *testing.B) {
+ const batchSize = 512
+ for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 {
+ b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) {
+ output := make([]int32, sz)
+ values := make([]int32, sz)
+ for idx := range values {
+ values[idx] = 64
+ }
+ encoder := encoding.NewEncoder(parquet.Types.Int32,
parquet.Encodings.ByteStreamSplit,
+ false, nil,
memory.DefaultAllocator).(encoding.Int32Encoder)
+ encoder.Put(values)
+ buf, _ := encoder.FlushValues()
+ defer buf.Release()
+
+ decoder := encoding.NewDecoder(parquet.Types.Int32,
parquet.Encodings.ByteStreamSplit, nil, memory.DefaultAllocator)
+ b.ResetTimer()
+ b.SetBytes(int64(len(values) * arrow.Int32SizeBytes))
+ for n := 0; n < b.N; n++ {
+ decoder.SetData(sz, buf.Bytes())
+ for batch := 0; batch*batchSize < sz; batch++ {
+ offset := batch * batchSize
+
decoder.(encoding.Int32Decoder).Decode(output[offset : offset+batchSize])
+ }
+ }
+ })
+ }
+}
+
+func BenchmarkByteStreamSplitEncodingInt64(b *testing.B) {
+ for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 {
+ b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) {
+ values := make([]int64, sz)
+ for idx := range values {
+ values[idx] = 64
+ }
+ encoder := encoding.NewEncoder(parquet.Types.Int64,
parquet.Encodings.ByteStreamSplit,
+ false, nil,
memory.DefaultAllocator).(encoding.Int64Encoder)
+ b.ResetTimer()
+ b.SetBytes(int64(len(values) * arrow.Int64SizeBytes))
+ for n := 0; n < b.N; n++ {
+ encoder.Put(values)
+ buf, _ := encoder.FlushValues()
+ buf.Release()
+ }
+ })
+ }
+}
+
+func BenchmarkByteStreamSplitDecodingInt64(b *testing.B) {
+ for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 {
+ b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) {
+ output := make([]int64, sz)
+ values := make([]int64, sz)
+ for idx := range values {
+ values[idx] = 64
+ }
+ encoder := encoding.NewEncoder(parquet.Types.Int64,
parquet.Encodings.ByteStreamSplit,
+ false, nil,
memory.DefaultAllocator).(encoding.Int64Encoder)
+ encoder.Put(values)
+ buf, _ := encoder.FlushValues()
+ defer buf.Release()
+
+ decoder := encoding.NewDecoder(parquet.Types.Int64,
parquet.Encodings.ByteStreamSplit, nil, memory.DefaultAllocator)
+ b.ResetTimer()
+ b.SetBytes(int64(len(values) * arrow.Int64SizeBytes))
+ for n := 0; n < b.N; n++ {
+ decoder.SetData(sz, buf.Bytes())
+ decoder.(encoding.Int64Decoder).Decode(output)
+ }
+ })
+ }
+}
+
+func BenchmarkByteStreamSplitEncodingFixedLenByteArray(b *testing.B) {
+ for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 {
+ b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) {
+ values := make([]parquet.FixedLenByteArray, sz)
+ for idx := range values {
+ values[idx] = []byte{0x12, 0x34, 0x56, 0x78}
+ }
+
+ arraySize := len(values[0])
+ col :=
schema.NewColumn(schema.NewFixedLenByteArrayNode("fixedlenbytearray",
parquet.Repetitions.Required, int32(arraySize), -1), 0, 0)
+ encoder :=
encoding.NewEncoder(parquet.Types.FixedLenByteArray,
parquet.Encodings.ByteStreamSplit,
+ false, col,
memory.DefaultAllocator).(encoding.FixedLenByteArrayEncoder)
+ b.ResetTimer()
+ b.SetBytes(int64(len(values) * arraySize))
+ for n := 0; n < b.N; n++ {
+ encoder.Put(values)
+ buf, _ := encoder.FlushValues()
+ buf.Release()
+ }
+ })
+ }
+}
+
+func BenchmarkByteStreamSplitDecodingFixedLenByteArray(b *testing.B) {
+ for sz := MINSIZE; sz < MAXSIZE+1; sz *= 2 {
+ b.Run(fmt.Sprintf("len %d", sz), func(b *testing.B) {
+ output := make([]parquet.FixedLenByteArray, sz)
+ values := make([]parquet.FixedLenByteArray, sz)
+ for idx := range values {
+ values[idx] = []byte{0x12, 0x34, 0x56, 0x78}
+ }
+
+ arraySize := len(values[0])
+ col :=
schema.NewColumn(schema.NewFixedLenByteArrayNode("fixedlenbytearray",
parquet.Repetitions.Required, int32(arraySize), -1), 0, 0)
+ encoder :=
encoding.NewEncoder(parquet.Types.FixedLenByteArray,
parquet.Encodings.ByteStreamSplit,
+ false, col,
memory.DefaultAllocator).(encoding.FixedLenByteArrayEncoder)
+ encoder.Put(values)
+ buf, _ := encoder.FlushValues()
+ defer buf.Release()
+
+ decoder :=
encoding.NewDecoder(parquet.Types.FixedLenByteArray,
parquet.Encodings.ByteStreamSplit, col, memory.DefaultAllocator)
+ b.ResetTimer()
+ b.SetBytes(int64(len(values) * arraySize))
+ for n := 0; n < b.N; n++ {
+ decoder.SetData(sz, buf.Bytes())
+
decoder.(encoding.FixedLenByteArrayDecoder).Decode(output)
+ }
+ })
+ }
+}
+
+// func BenchmarkByteStreamSplitEncodingFileRead(b *testing.B) {
+// dir := os.Getenv("PARQUET_TEST_DATA")
+// if dir == "" {
+// b.Skip("no path supplied with PARQUET_TEST_DATA")
+// }
+// require.DirExists(b, dir)
+
+// props := parquet.NewReaderProperties(memory.DefaultAllocator)
+// fileReader, err := file.OpenParquetFile(path.Join(dir,
"byte_stream_split_extended.gzip.parquet"),
+// false, file.WithReadProps(props))
+// require.NoError(b, err)
+// defer fileReader.Close()
+
+// nRows := 200
+// nCols := 14
Review Comment:
should be removed? Or uncommented?
##########
go/parquet/internal/encoding/byte_stream_split.go:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package encoding
+
+import (
+ "fmt"
+ "math"
+
+ "github.com/apache/arrow/go/v17/arrow"
+ "github.com/apache/arrow/go/v17/arrow/memory"
+ "github.com/apache/arrow/go/v17/parquet"
+ "golang.org/x/xerrors"
+)
+
+// encodeByteStreamSplit encodes the raw bytes provided by 'in' into the
output buffer 'data' using BYTE_STREAM_SPLIT encoding.
+// 'data' must have space for at least len(in) bytes.
+func encodeByteStreamSplit(data []byte, in []byte, width int) {
+ numElements := len(in) / width
+ for stream := 0; stream < width; stream++ {
+ for element := 0; element < numElements; element++ {
+ encLoc := numElements*stream + element
+ decLoc := width*element + stream
+ data[encLoc] = in[decLoc]
+ }
+ }
+}
+
+// encodeByteStreamSplitWidth2 implements encodeByteStreamSplit optimized for
types stored using 2 bytes.
+// 'data' must have space for at least len(in) bytes.
+func encodeByteStreamSplitWidth2(data []byte, in []byte) {
+ const width = 2
+ numElements := len(in) / width
+ for element := 0; element < numElements; element++ {
+ decLoc := width * element
+ data[element] = in[decLoc]
+ data[numElements+element] = in[decLoc+1]
+ }
+}
+
+// encodeByteStreamSplitWidth4 implements encodeByteStreamSplit optimized for
types stored using 4 bytes.
+// 'data' must have space for at least len(in) bytes.
+func encodeByteStreamSplitWidth4(data []byte, in []byte) {
+ const width = 4
+ numElements := len(in) / width
+ for element := 0; element < numElements; element++ {
+ decLoc := width * element
+ data[element] = in[decLoc]
+ data[numElements+element] = in[decLoc+1]
+ data[numElements*2+element] = in[decLoc+2]
+ data[numElements*3+element] = in[decLoc+3]
+ }
+}
+
+// encodeByteStreamSplitWidth8 implements encodeByteStreamSplit optimized for
types stored using 8 bytes.
+// 'data' must have space for at least len(in) bytes.
+func encodeByteStreamSplitWidth8(data []byte, in []byte) {
+ const width = 8
+ numElements := len(in) / width
+ for element := 0; element < numElements; element++ {
+ decLoc := width * element
+ data[element] = in[decLoc]
+ data[numElements+element] = in[decLoc+1]
+ data[numElements*2+element] = in[decLoc+2]
+ data[numElements*3+element] = in[decLoc+3]
+ data[numElements*4+element] = in[decLoc+4]
+ data[numElements*5+element] = in[decLoc+5]
+ data[numElements*6+element] = in[decLoc+6]
+ data[numElements*7+element] = in[decLoc+7]
+ }
+}
+
+// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes
representing a 4-byte datatype provided by 'data',
+// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least len(data) bytes.
+func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out
[]byte) {
+ const width = 4
+ for element := 0; element < nValues; element++ {
+ out[width*element] = data[element]
+ out[width*element+1] = data[stride+element]
+ out[width*element+2] = data[2*stride+element]
+ out[width*element+3] = data[3*stride+element]
+ }
+}
+
+// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes
representing a 8-byte datatype provided by 'data',
+// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least len(data) bytes.
+func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out
[]byte) {
+ const width = 8
+ for element := 0; element < nValues; element++ {
+ out[width*element] = data[element]
+ out[width*element+1] = data[stride+element]
+ out[width*element+2] = data[2*stride+element]
+ out[width*element+3] = data[3*stride+element]
+ out[width*element+4] = data[4*stride+element]
+ out[width*element+5] = data[5*stride+element]
+ out[width*element+6] = data[6*stride+element]
+ out[width*element+7] = data[7*stride+element]
+ }
+}
+
+// decodeByteStreamSplitBatchFLBA decodes the batch of nValues
FixedLenByteArrays provided by 'data',
+// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least nValues slices.
+func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int,
out []parquet.FixedLenByteArray) {
+ for stream := 0; stream < width; stream++ {
+ for element := 0; element < nValues; element++ {
+ encLoc := stride*stream + element
+ out[element][stream] = data[encLoc]
+ }
+ }
+}
+
+// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues
FixedLenByteArrays of length 2 provided by 'data',
+// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least nValues slices.
+func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int,
out []parquet.FixedLenByteArray) {
+ for element := 0; element < nValues; element++ {
+ out[element][0] = data[element]
+ out[element][1] = data[stride+element]
+ }
+}
+
+// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues
FixedLenByteArrays of length 4 provided by 'data',
+// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least nValues slices.
+func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int,
out []parquet.FixedLenByteArray) {
+ for element := 0; element < nValues; element++ {
+ out[element][0] = data[element]
+ out[element][1] = data[stride+element]
+ out[element][2] = data[stride*2+element]
+ out[element][3] = data[stride*3+element]
+ }
+}
+
+// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues
FixedLenByteArrays of length 8 provided by 'data',
+// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least nValues slices.
+func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int,
out []parquet.FixedLenByteArray) {
+ for element := 0; element < nValues; element++ {
+ out[element][0] = data[element]
+ out[element][1] = data[stride+element]
+ out[element][2] = data[stride*2+element]
+ out[element][3] = data[stride*3+element]
+ out[element][4] = data[stride*4+element]
+ out[element][5] = data[stride*5+element]
+ out[element][6] = data[stride*6+element]
+ out[element][7] = data[stride*7+element]
+ }
+}
+
+func releaseBufferToPool(pooled *PooledBufferWriter) {
+ buf := pooled.buf
+ memory.Set(buf.Buf(), 0)
+ buf.ResizeNoShrink(0)
+ bufferPool.Put(buf)
+}
+
+func validateByteStreamSplitPageData(typeLen, nvals int, data []byte) (int,
error) {
+ if nvals*typeLen < len(data) {
+ return 0, fmt.Errorf("data size (%d) is too small for the
number of values in in BYTE_STREAM_SPLIT (%d)", len(data), nvals)
+ }
+
+ if len(data)%typeLen != 0 {
+ return 0, fmt.Errorf("ByteStreamSplit data size %d not aligned
with byte_width: %d", len(data), typeLen)
+ }
+
+ return len(data) / typeLen, nil
+}
+
+// ByteStreamSplitFloat32Encoder writes the underlying bytes of the Float32
+// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
+type ByteStreamSplitFloat32Encoder struct {
+ PlainFloat32Encoder
+ flushBuffer *PooledBufferWriter
+}
+
+func (enc *ByteStreamSplitFloat32Encoder) FlushValues() (Buffer, error) {
+ in, err := enc.PlainFloat32Encoder.FlushValues()
+ if err != nil {
+ return nil, err
+ }
+
+ if enc.flushBuffer == nil {
+ enc.flushBuffer = NewPooledBufferWriter(in.Len())
+ }
+
+ enc.flushBuffer.buf.Resize(in.Len())
+ encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes())
+ return enc.flushBuffer.Finish(), nil
+}
+
+func (enc *ByteStreamSplitFloat32Encoder) Release() {
+ enc.PlainFloat32Encoder.Release()
+ releaseBufferToPool(enc.flushBuffer)
+ enc.flushBuffer = nil
+}
+
+// ByteStreamSplitFloat64Encoder writes the underlying bytes of the Float64
+// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
+type ByteStreamSplitFloat64Encoder struct {
+ PlainFloat64Encoder
+ flushBuffer *PooledBufferWriter
+}
+
+func (enc *ByteStreamSplitFloat64Encoder) FlushValues() (Buffer, error) {
+ in, err := enc.PlainFloat64Encoder.FlushValues()
+ if err != nil {
+ return nil, err
+ }
+
+ if enc.flushBuffer == nil {
+ enc.flushBuffer = NewPooledBufferWriter(in.Len())
+ }
+
+ enc.flushBuffer.buf.Resize(in.Len())
+ encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes())
+ return enc.flushBuffer.Finish(), nil
+}
+
+func (enc *ByteStreamSplitFloat64Encoder) Release() {
+ enc.PlainFloat64Encoder.Release()
+ releaseBufferToPool(enc.flushBuffer)
+ enc.flushBuffer = nil
+}
+
+// ByteStreamSplitInt32Encoder writes the underlying bytes of the Int32
+// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
+type ByteStreamSplitInt32Encoder struct {
+ PlainInt32Encoder
+ flushBuffer *PooledBufferWriter
+}
+
+func (enc *ByteStreamSplitInt32Encoder) FlushValues() (Buffer, error) {
+ in, err := enc.PlainInt32Encoder.FlushValues()
+ if err != nil {
+ return nil, err
+ }
+
+ if enc.flushBuffer == nil {
+ enc.flushBuffer = NewPooledBufferWriter(in.Len())
+ }
+
+ enc.flushBuffer.buf.Resize(in.Len())
+ encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes())
+ return enc.flushBuffer.Finish(), nil
+}
+
+func (enc *ByteStreamSplitInt32Encoder) Release() {
+ enc.PlainInt32Encoder.Release()
+ releaseBufferToPool(enc.flushBuffer)
+ enc.flushBuffer = nil
+}
+
+// ByteStreamSplitInt64Encoder writes the underlying bytes of the Int64
+// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
+type ByteStreamSplitInt64Encoder struct {
+ PlainInt64Encoder
+ flushBuffer *PooledBufferWriter
+}
+
+func (enc *ByteStreamSplitInt64Encoder) FlushValues() (Buffer, error) {
+ in, err := enc.PlainInt64Encoder.FlushValues()
+ if err != nil {
+ return nil, err
+ }
+
+ if enc.flushBuffer == nil {
+ enc.flushBuffer = NewPooledBufferWriter(in.Len())
+ }
+
+ enc.flushBuffer.buf.Resize(in.Len())
+ encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes())
+ return enc.flushBuffer.Finish(), nil
+}
+
+func (enc *ByteStreamSplitInt64Encoder) Release() {
+ enc.PlainInt64Encoder.Release()
+ releaseBufferToPool(enc.flushBuffer)
+ enc.flushBuffer = nil
+}
+
+type ByteStreamSplitFloat32Decoder = ByteStreamSplitDecoder[float32]
+type ByteStreamSplitFloat64Decoder = ByteStreamSplitDecoder[float64]
+type ByteStreamSplitInt32Decoder = ByteStreamSplitDecoder[int32]
+type ByteStreamSplitInt64Decoder = ByteStreamSplitDecoder[int64]
Review Comment:
Should we do the same approach for the encoders too?
Should probably also add godoc comments on these
##########
go/parquet/internal/encoding/byte_stream_split.go:
##########
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package encoding
+
+import (
+ "fmt"
+ "math"
+
+ "github.com/apache/arrow/go/v17/arrow"
+ "github.com/apache/arrow/go/v17/arrow/memory"
+ "github.com/apache/arrow/go/v17/parquet"
+ "golang.org/x/xerrors"
+)
+
+// encodeByteStreamSplit encodes the raw bytes provided by 'in' into the
output buffer 'data' using BYTE_STREAM_SPLIT encoding.
+// 'data' must have space for at least len(in) bytes.
+func encodeByteStreamSplit(data []byte, in []byte, width int) {
+ numElements := len(in) / width
+ for stream := 0; stream < width; stream++ {
+ for element := 0; element < numElements; element++ {
+ encLoc := numElements*stream + element
+ decLoc := width*element + stream
+ data[encLoc] = in[decLoc]
+ }
+ }
+}
+
+// encodeByteStreamSplitWidth2 implements encodeByteStreamSplit optimized for
types stored using 2 bytes.
+// 'data' must have space for at least len(in) bytes.
+func encodeByteStreamSplitWidth2(data []byte, in []byte) {
+ const width = 2
+ numElements := len(in) / width
+ for element := 0; element < numElements; element++ {
+ decLoc := width * element
+ data[element] = in[decLoc]
+ data[numElements+element] = in[decLoc+1]
+ }
+}
+
+// encodeByteStreamSplitWidth4 implements encodeByteStreamSplit optimized for
types stored using 4 bytes.
+// 'data' must have space for at least len(in) bytes.
+func encodeByteStreamSplitWidth4(data []byte, in []byte) {
+ const width = 4
+ numElements := len(in) / width
+ for element := 0; element < numElements; element++ {
+ decLoc := width * element
+ data[element] = in[decLoc]
+ data[numElements+element] = in[decLoc+1]
+ data[numElements*2+element] = in[decLoc+2]
+ data[numElements*3+element] = in[decLoc+3]
+ }
+}
+
+// encodeByteStreamSplitWidth8 implements encodeByteStreamSplit optimized for
types stored using 8 bytes.
+// 'data' must have space for at least len(in) bytes.
+func encodeByteStreamSplitWidth8(data []byte, in []byte) {
+ const width = 8
+ numElements := len(in) / width
+ for element := 0; element < numElements; element++ {
+ decLoc := width * element
+ data[element] = in[decLoc]
+ data[numElements+element] = in[decLoc+1]
+ data[numElements*2+element] = in[decLoc+2]
+ data[numElements*3+element] = in[decLoc+3]
+ data[numElements*4+element] = in[decLoc+4]
+ data[numElements*5+element] = in[decLoc+5]
+ data[numElements*6+element] = in[decLoc+6]
+ data[numElements*7+element] = in[decLoc+7]
+ }
+}
+
+// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes
representing a 4-byte datatype provided by 'data',
+// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least len(data) bytes.
+func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out
[]byte) {
+ const width = 4
+ for element := 0; element < nValues; element++ {
+ out[width*element] = data[element]
+ out[width*element+1] = data[stride+element]
+ out[width*element+2] = data[2*stride+element]
+ out[width*element+3] = data[3*stride+element]
+ }
+}
+
+// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes
representing a 8-byte datatype provided by 'data',
+// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least len(data) bytes.
+func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out
[]byte) {
+ const width = 8
+ for element := 0; element < nValues; element++ {
+ out[width*element] = data[element]
+ out[width*element+1] = data[stride+element]
+ out[width*element+2] = data[2*stride+element]
+ out[width*element+3] = data[3*stride+element]
+ out[width*element+4] = data[4*stride+element]
+ out[width*element+5] = data[5*stride+element]
+ out[width*element+6] = data[6*stride+element]
+ out[width*element+7] = data[7*stride+element]
+ }
+}
+
+// decodeByteStreamSplitBatchFLBA decodes the batch of nValues
FixedLenByteArrays provided by 'data',
+// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least nValues slices.
+func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int,
out []parquet.FixedLenByteArray) {
+ for stream := 0; stream < width; stream++ {
+ for element := 0; element < nValues; element++ {
+ encLoc := stride*stream + element
+ out[element][stream] = data[encLoc]
+ }
+ }
+}
+
+// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues
FixedLenByteArrays of length 2 provided by 'data',
+// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least nValues slices.
+func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int,
out []parquet.FixedLenByteArray) {
+ for element := 0; element < nValues; element++ {
+ out[element][0] = data[element]
+ out[element][1] = data[stride+element]
+ }
+}
+
+// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues
FixedLenByteArrays of length 4 provided by 'data',
+// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least nValues slices.
+func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int,
out []parquet.FixedLenByteArray) {
+ for element := 0; element < nValues; element++ {
+ out[element][0] = data[element]
+ out[element][1] = data[stride+element]
+ out[element][2] = data[stride*2+element]
+ out[element][3] = data[stride*3+element]
+ }
+}
+
+// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues
FixedLenByteArrays of length 8 provided by 'data',
+// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
+// 'out' must have space for at least nValues slices.
+func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int,
out []parquet.FixedLenByteArray) {
+ for element := 0; element < nValues; element++ {
+ out[element][0] = data[element]
+ out[element][1] = data[stride+element]
+ out[element][2] = data[stride*2+element]
+ out[element][3] = data[stride*3+element]
+ out[element][4] = data[stride*4+element]
+ out[element][5] = data[stride*5+element]
+ out[element][6] = data[stride*6+element]
+ out[element][7] = data[stride*7+element]
+ }
+}
+
+func releaseBufferToPool(pooled *PooledBufferWriter) {
+ buf := pooled.buf
+ memory.Set(buf.Buf(), 0)
+ buf.ResizeNoShrink(0)
+ bufferPool.Put(buf)
+}
+
+func validateByteStreamSplitPageData(typeLen, nvals int, data []byte) (int,
error) {
+ if nvals*typeLen < len(data) {
+ return 0, fmt.Errorf("data size (%d) is too small for the
number of values in in BYTE_STREAM_SPLIT (%d)", len(data), nvals)
+ }
+
+ if len(data)%typeLen != 0 {
+ return 0, fmt.Errorf("ByteStreamSplit data size %d not aligned
with byte_width: %d", len(data), typeLen)
+ }
+
+ return len(data) / typeLen, nil
+}
+
+// ByteStreamSplitFloat32Encoder writes the underlying bytes of the Float32
+// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
+type ByteStreamSplitFloat32Encoder struct {
+ PlainFloat32Encoder
+ flushBuffer *PooledBufferWriter
+}
+
+func (enc *ByteStreamSplitFloat32Encoder) FlushValues() (Buffer, error) {
+ in, err := enc.PlainFloat32Encoder.FlushValues()
+ if err != nil {
+ return nil, err
+ }
+
+ if enc.flushBuffer == nil {
+ enc.flushBuffer = NewPooledBufferWriter(in.Len())
+ }
+
+ enc.flushBuffer.buf.Resize(in.Len())
+ encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes())
+ return enc.flushBuffer.Finish(), nil
+}
+
+func (enc *ByteStreamSplitFloat32Encoder) Release() {
+ enc.PlainFloat32Encoder.Release()
+ releaseBufferToPool(enc.flushBuffer)
+ enc.flushBuffer = nil
+}
+
+// ByteStreamSplitFloat64Encoder writes the underlying bytes of the Float64
+// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
+type ByteStreamSplitFloat64Encoder struct {
+ PlainFloat64Encoder
+ flushBuffer *PooledBufferWriter
+}
+
+func (enc *ByteStreamSplitFloat64Encoder) FlushValues() (Buffer, error) {
+ in, err := enc.PlainFloat64Encoder.FlushValues()
+ if err != nil {
+ return nil, err
+ }
+
+ if enc.flushBuffer == nil {
+ enc.flushBuffer = NewPooledBufferWriter(in.Len())
+ }
+
+ enc.flushBuffer.buf.Resize(in.Len())
+ encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes())
+ return enc.flushBuffer.Finish(), nil
+}
+
+func (enc *ByteStreamSplitFloat64Encoder) Release() {
+ enc.PlainFloat64Encoder.Release()
+ releaseBufferToPool(enc.flushBuffer)
+ enc.flushBuffer = nil
+}
+
+// ByteStreamSplitInt32Encoder writes the underlying bytes of the Int32
+// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
+type ByteStreamSplitInt32Encoder struct {
+ PlainInt32Encoder
+ flushBuffer *PooledBufferWriter
+}
+
+func (enc *ByteStreamSplitInt32Encoder) FlushValues() (Buffer, error) {
+ in, err := enc.PlainInt32Encoder.FlushValues()
+ if err != nil {
+ return nil, err
+ }
+
+ if enc.flushBuffer == nil {
+ enc.flushBuffer = NewPooledBufferWriter(in.Len())
+ }
+
+ enc.flushBuffer.buf.Resize(in.Len())
+ encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes())
+ return enc.flushBuffer.Finish(), nil
+}
+
+func (enc *ByteStreamSplitInt32Encoder) Release() {
+ enc.PlainInt32Encoder.Release()
+ releaseBufferToPool(enc.flushBuffer)
+ enc.flushBuffer = nil
+}
+
+// ByteStreamSplitInt64Encoder writes the underlying bytes of the Int64
+// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
+type ByteStreamSplitInt64Encoder struct {
+ PlainInt64Encoder
+ flushBuffer *PooledBufferWriter
+}
+
+func (enc *ByteStreamSplitInt64Encoder) FlushValues() (Buffer, error) {
+ in, err := enc.PlainInt64Encoder.FlushValues()
+ if err != nil {
+ return nil, err
+ }
+
+ if enc.flushBuffer == nil {
+ enc.flushBuffer = NewPooledBufferWriter(in.Len())
+ }
+
+ enc.flushBuffer.buf.Resize(in.Len())
+ encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes())
+ return enc.flushBuffer.Finish(), nil
+}
+
+func (enc *ByteStreamSplitInt64Encoder) Release() {
+ enc.PlainInt64Encoder.Release()
+ releaseBufferToPool(enc.flushBuffer)
+ enc.flushBuffer = nil
+}
+
+type ByteStreamSplitFloat32Decoder = ByteStreamSplitDecoder[float32]
+type ByteStreamSplitFloat64Decoder = ByteStreamSplitDecoder[float64]
+type ByteStreamSplitInt32Decoder = ByteStreamSplitDecoder[int32]
+type ByteStreamSplitInt64Decoder = ByteStreamSplitDecoder[int64]
+
+type ByteStreamSplitDecoder[T float32 | float64 | int32 | int64] struct {
+ decoder
+ stride int
+}
+
+func (dec *ByteStreamSplitDecoder[T]) Type() parquet.Type {
+ switch any(dec).(type) {
+ case *ByteStreamSplitDecoder[float32]:
+ return parquet.Types.Float
+ case *ByteStreamSplitDecoder[float64]:
+ return parquet.Types.Double
+ case *ByteStreamSplitDecoder[int32]:
+ return parquet.Types.Int32
+ case *ByteStreamSplitDecoder[int64]:
+ return parquet.Types.Int64
+ default:
+ return parquet.Types.Undefined
+ }
Review Comment:
since we should never hit this (because of the constraint on the type) let's
make this a panic instead
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]