emkornfield commented on a change in pull request #9671: URL: https://github.com/apache/arrow/pull/9671#discussion_r594845104
########## File path: go/parquet/internal/bmi/bitmap_bmi2.go ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bmi + +import "unsafe" + +//go:noescape +func _extract_bits(bitmap, selectBitmap uint64, res unsafe.Pointer) + +func extractBitsBMI2(bitmap, selectBitmap uint64) uint64 { + var ( + res uint64 + ) + + _extract_bits(bitmap, selectBitmap, unsafe.Pointer(&res)) + return res +} + +//go:noescape +func _popcount64(bitmap uint64, res unsafe.Pointer) + +func popCount64BMI2(bitmap uint64) uint64 { + var ( + res uint64 + ) + + _popcount64(bitmap, unsafe.Pointer(&res)) + return res +} + +//go:noescape +func _popcount32(bitmap uint32, res unsafe.Pointer) + +func popCount32BMI2(bitmap uint32) uint32 { + var ( + res uint32 + ) + + _popcount32(bitmap, unsafe.Pointer(&res)) + return res +} + +//go:noescape +func _levels_to_bitmap(levels unsafe.Pointer, numLevels int, rhs int16, res unsafe.Pointer) + +func greaterThanBitmapBMI2(levels []int16, rhs int16) uint64 { + if levels == nil || len(levels) == 0 { Review comment: docs? ########## File path: go/parquet/internal/testutils/random_arrow.go ########## @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "golang.org/x/exp/rand" +) + +func RandomNonNull(dt arrow.DataType, size int) array.Interface { + switch dt.ID() { + case arrow.FLOAT32: + bldr := array.NewFloat32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]float32, size) + FillRandomFloat32(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.FLOAT64: + bldr := array.NewFloat64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]float64, size) + FillRandomFloat64(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.INT64: + bldr := array.NewInt64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int64, size) + FillRandomInt64(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.UINT64: + bldr := array.NewUint64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint64, size) + FillRandomUint64(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.INT32: + bldr := array.NewInt32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int32, size) + FillRandomInt32(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.UINT32: + bldr := array.NewUint32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint32, size) + FillRandomUint32(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.INT16: + bldr := array.NewInt16Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int16, size) + FillRandomInt16(0, 0, 64, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.UINT16: + bldr := array.NewUint16Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint16, size) + FillRandomUint16(0, 0, 64, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.INT8: + bldr := array.NewInt8Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int8, size) + FillRandomInt8(0, 0, 64, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.UINT8: + bldr := array.NewUint8Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint8, size) + FillRandomUint8(0, 0, 64, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.DATE32: + bldr := array.NewDate32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int32, size) + FillRandomInt32Max(0, 24, values) + + dates := make([]arrow.Date32, size) + for idx, val := range values { + dates[idx] = arrow.Date32(val) * 86400000 + } + bldr.AppendValues(dates, nil) + return bldr.NewArray() + case arrow.DATE64: + bldr := array.NewDate64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int64, size) + FillRandomInt64Max(0, 24, values) + + dates := make([]arrow.Date64, size) + for idx, val := range values { + dates[idx] = arrow.Date64(val) * 86400000 + } + bldr.AppendValues(dates, nil) + return bldr.NewArray() + case arrow.STRING: + bldr := array.NewStringBuilder(memory.DefaultAllocator) + defer bldr.Release() + for i := 0; i < size; i++ { + bldr.Append("test-string") + } + return bldr.NewArray() + case arrow.BINARY: + bldr := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary) + defer bldr.Release() + + buf := make([]byte, 12) + r := rand.New(rand.NewSource(0)) + for i := 0; i < size; i++ { + length := r.Intn(12-2+1) + 2 + r.Read(buf[:length]) + bldr.Append(buf[:length]) + } + return bldr.NewArray() + case arrow.FIXED_SIZE_BINARY: + bldr := array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator, &arrow.FixedSizeBinaryType{ByteWidth: 10}) + defer bldr.Release() + + buf := make([]byte, 10) + r := rand.New(rand.NewSource(0)) + for i := 0; i < size; i++ { + r.Read(buf) + bldr.Append(buf) + } + return bldr.NewArray() + // case arrow.DECIMAL: + // dectype := dt.(*arrow.Decimal128Type) + // bldr := array.NewDecimal128Builder(memory.DefaultAllocator, dectype) + // defer bldr.Release() + + // data := RandomDecimals(int64(size), 0, dectype.Precision) + // bldr.AppendValues(arrow.Decimal128Traits.CastFromBytes(data), nil) + // return bldr.NewArray() + case arrow.BOOL: + bldr := array.NewBooleanBuilder(memory.DefaultAllocator) + defer bldr.Release() + + values := make([]bool, size) + FillRandomBooleans(0.5, 0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + } + return nil +} + +func RandomNullable(dt arrow.DataType, size int, numNulls int) array.Interface { + switch dt.ID() { + case arrow.FLOAT32: + bldr := array.NewFloat32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]float32, size) + FillRandomFloat32(0, values) + + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.FLOAT64: + bldr := array.NewFloat64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]float64, size) + FillRandomFloat64(0, values) + + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.INT8: + bldr := array.NewInt8Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int8, size) + FillRandomInt8(0, 0, 64, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.UINT8: + bldr := array.NewUint8Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint8, size) + FillRandomUint8(0, 0, 64, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.INT16: + bldr := array.NewInt16Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int16, size) + FillRandomInt16(0, 0, 64, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.UINT16: + bldr := array.NewUint16Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint16, size) + FillRandomUint16(0, 0, 64, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.INT32: + bldr := array.NewInt32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int32, size) + FillRandomInt32Max(0, 64, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.UINT32: + bldr := array.NewUint32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint32, size) + FillRandomUint32Max(0, 64, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + bldr.AppendValues(values, valid) + return bldr.NewArray() + + case arrow.INT64: + bldr := array.NewInt64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int64, size) + FillRandomInt64Max(0, 64, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.UINT64: + bldr := array.NewUint64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint64, size) + FillRandomUint64Max(0, 64, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.DATE32: + bldr := array.NewDate32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int32, size) + FillRandomInt32Max(0, 24, values) + + dates := make([]arrow.Date32, size) + for idx, val := range values { + dates[idx] = arrow.Date32(val) * 86400000 + } + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + bldr.AppendValues(dates, valid) + return bldr.NewArray() + case arrow.DATE64: + bldr := array.NewDate64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int64, size) + FillRandomInt64Max(0, 24, values) + + dates := make([]arrow.Date64, size) + for idx, val := range values { + dates[idx] = arrow.Date64(val) * 86400000 + } + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + bldr.AppendValues(dates, valid) + return bldr.NewArray() + case arrow.BINARY: + bldr := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary) + defer bldr.Release() + + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + buf := make([]byte, 12) + r := rand.New(rand.NewSource(0)) + for i := 0; i < size; i++ { + if !valid[i] { + bldr.AppendNull() + continue + } + + length := r.Intn(12-2+1) + 2 + r.Read(buf[:length]) + bldr.Append(buf[:length]) + } + return bldr.NewArray() + case arrow.STRING: + bldr := array.NewStringBuilder(memory.DefaultAllocator) + defer bldr.Release() + + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + buf := make([]byte, 12) + r := rand.New(rand.NewSource(0)) + for i := 0; i < size; i++ { + if !valid[i] { + bldr.AppendNull() + continue + } + + length := r.Intn(12-2+1) + 2 + r.Read(buf[:length]) + // trivially force data to be valid UTF8 by making it all ASCII + for idx := range buf[:length] { + buf[idx] &= 0x7f + } + bldr.Append(string(buf[:length])) + } + return bldr.NewArray() + case arrow.FIXED_SIZE_BINARY: + bldr := array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator, &arrow.FixedSizeBinaryType{ByteWidth: 10}) + defer bldr.Release() + + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + + buf := make([]byte, 10) + r := rand.New(rand.NewSource(0)) + for i := 0; i < size; i++ { + if !valid[i] { + bldr.AppendNull() + continue + } + + r.Read(buf) + bldr.Append(buf) + } + return bldr.NewArray() + // case arrow.DECIMAL: Review comment: same question. ########## File path: go/parquet/internal/bmi/_lib/bitmap_bmi2.s ########## @@ -0,0 +1,174 @@ + .text Review comment: are both the c and assembly necessary? ########## File path: go/parquet/internal/bmi/bmi_init.go ########## @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bmi + +import ( + "golang.org/x/sys/cpu" +) + +type funcs struct { + extractBits func(uint64, uint64) uint64 + popcount64 func(uint64) uint64 + popcount32 func(uint32) uint32 + gtbitmap func([]int16, int16) uint64 +} + +var funclist funcs + +func init() { + if cpu.X86.HasPOPCNT { + funclist.popcount64 = popCount64BMI2 + funclist.popcount32 = popCount32BMI2 + } else { + funclist.popcount64 = popCount64Go + funclist.popcount32 = popCount32Go + } + if cpu.X86.HasBMI2 { + funclist.extractBits = extractBitsBMI2 + } else { + funclist.extractBits = extractBitsGo + } + if cpu.X86.HasAVX2 { + funclist.gtbitmap = greaterThanBitmapBMI2 + } else { + funclist.gtbitmap = greaterThanBitmapGo + } +} + +func ExtractBits(bitmap, selectBitmap uint64) uint64 { Review comment: docs for these methods? ########## File path: go/parquet/internal/bmi/_lib/bitmap_bmi2.c ########## @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <stdint.h> +#include <x86intrin.h> + +void extract_bits(uint64_t bitmap, uint64_t select_bitmap, uint64_t* res) { + *res = _pext_u64(bitmap, select_bitmap); +} + +void popcount64(uint64_t bitmap, uint64_t* res) { Review comment: is there a reason why you don't return the result directly (i.e. use res as output?) ########## File path: go/parquet/internal/testutils/random.go ########## @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "math" + "time" + "unsafe" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/bitutil" + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/parquet" + + // "github.factset.com/mtopol/parquet-go/pqarrow" + "golang.org/x/exp/rand" + "gonum.org/v1/gonum/stat/distuv" +) + +type RandomArrayGenerator struct { + seed uint64 + extra uint64 + src rand.Source + seedRand *rand.Rand +} + +func NewRandomArrayGenerator(seed uint64) RandomArrayGenerator { + src := rand.NewSource(seed) + return RandomArrayGenerator{seed, 0, src, rand.New(src)} +} + +func (r *RandomArrayGenerator) GenerateBitmap(buffer []byte, n int64, prob float64) int64 { + count := int64(0) + r.extra++ + + dist := distuv.Bernoulli{P: prob, Src: rand.NewSource(r.seed + r.extra)} + for i := int(0); int64(i) < n; i++ { + if dist.Rand() != float64(0.0) { + bitutil.SetBit(buffer, i) + } else { + count++ + } + } + + return count +} + +func (r *RandomArrayGenerator) ByteArray(size int64, minLen, maxLen int32, nullProb float64) array.Interface { + if nullProb < 0 || nullProb > 1 { + panic("null prob must be between 0 and 1") + } + + lengths := r.Int32(size, minLen, maxLen, nullProb) + defer lengths.Release() + + r.extra++ + dist := rand.New(rand.NewSource(r.seed + r.extra)) + bldr := array.NewStringBuilder(memory.DefaultAllocator) + defer bldr.Release() + + strbuf := make([]byte, maxLen) + + for i := 0; int64(i) < size; i++ { + if lengths.IsValid(i) { + l := lengths.Value(i) + for j := int32(0); j < l; j++ { + strbuf[j] = byte(dist.Int31n(int32('z')-int32('A')+1) + int32('A')) + } + val := strbuf[:l] + bldr.Append(*(*string)(unsafe.Pointer(&val))) + } else { + bldr.AppendNull() + } + } + + return bldr.NewArray() +} + +func (r *RandomArrayGenerator) Uint8(size int64, min, max uint8, prob float64) array.Interface { + buffers := make([]*memory.Buffer, 2) + nullCount := int64(0) + + buffers[0] = memory.NewResizableBuffer(memory.DefaultAllocator) + buffers[0].Resize(int(bitutil.BytesForBits(size))) + nullCount = r.GenerateBitmap(buffers[0].Bytes(), size, prob) + + buffers[1] = memory.NewResizableBuffer(memory.DefaultAllocator) + buffers[1].Resize(int(size * int64(arrow.Uint8SizeBytes))) + + r.extra++ + dist := rand.New(rand.NewSource(r.seed + r.extra)) + out := arrow.Uint8Traits.CastFromBytes(buffers[1].Bytes()) + for i := int64(0); i < size; i++ { + out[i] = uint8(dist.Intn(int(max-min+1))) + min + } + + return array.NewUint8Data(array.NewData(arrow.PrimitiveTypes.Uint8, int(size), buffers, nil, int(nullCount), 0)) +} + +func (r *RandomArrayGenerator) Int32(size int64, min, max int32, pctNull float64) *array.Int32 { + buffers := make([]*memory.Buffer, 2) + nullCount := int64(0) + + buffers[0] = memory.NewResizableBuffer(memory.DefaultAllocator) + buffers[0].Resize(int(bitutil.BytesForBits(size))) + nullCount = r.GenerateBitmap(buffers[0].Bytes(), size, 1-pctNull) + + buffers[1] = memory.NewResizableBuffer(memory.DefaultAllocator) + buffers[1].Resize(arrow.Int32Traits.BytesRequired(int(size))) + + r.extra++ + dist := rand.New(rand.NewSource(r.seed + r.extra)) + out := arrow.Int32Traits.CastFromBytes(buffers[1].Bytes()) + for i := int64(0); i < size; i++ { + out[i] = dist.Int31n(max-min+1) + min + } + return array.NewInt32Data(array.NewData(arrow.PrimitiveTypes.Int32, int(size), buffers, nil, int(nullCount), 0)) +} + +func (r *RandomArrayGenerator) Float64(size int64, pctNull float64) *array.Float64 { Review comment: do these functions belong in the parquet package or in the Arrow package? ########## File path: go/parquet/internal/testutils/random_arrow.go ########## @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "golang.org/x/exp/rand" +) + +func RandomNonNull(dt arrow.DataType, size int) array.Interface { Review comment: based on the name of the files, Arrow might be the better package here? ########## File path: go/parquet/internal/bmi/bmi_init.go ########## @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bmi + +import ( + "golang.org/x/sys/cpu" +) + +type funcs struct { + extractBits func(uint64, uint64) uint64 + popcount64 func(uint64) uint64 + popcount32 func(uint32) uint32 + gtbitmap func([]int16, int16) uint64 +} + +var funclist funcs + +func init() { + if cpu.X86.HasPOPCNT { + funclist.popcount64 = popCount64BMI2 Review comment: i see you are doing it conditionally here. does popct32BMI2 provide benefit here? I would think Go's implementation would be more portable in general, and I assume uses intrinsics under the hood? ########## File path: go/parquet/internal/utils/math.go ########## @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +func Min(a, b int64) int64 { Review comment: there aren't standard library functions that do this arleady? ########## File path: go/parquet/internal/bmi/bitmap_bmi2.go ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bmi + +import "unsafe" + +//go:noescape +func _extract_bits(bitmap, selectBitmap uint64, res unsafe.Pointer) + +func extractBitsBMI2(bitmap, selectBitmap uint64) uint64 { + var ( + res uint64 + ) + + _extract_bits(bitmap, selectBitmap, unsafe.Pointer(&res)) + return res +} + +//go:noescape +func _popcount64(bitmap uint64, res unsafe.Pointer) + +func popCount64BMI2(bitmap uint64) uint64 { Review comment: Cant https://golang.org/pkg/math/bits/#OnesCount64 and corresponding methods be used instead of PopCount? ########## File path: go/parquet/internal/testutils/random_arrow.go ########## @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "golang.org/x/exp/rand" +) + +func RandomNonNull(dt arrow.DataType, size int) array.Interface { + switch dt.ID() { + case arrow.FLOAT32: + bldr := array.NewFloat32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]float32, size) + FillRandomFloat32(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.FLOAT64: + bldr := array.NewFloat64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]float64, size) + FillRandomFloat64(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.INT64: + bldr := array.NewInt64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int64, size) + FillRandomInt64(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.UINT64: + bldr := array.NewUint64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint64, size) + FillRandomUint64(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.INT32: + bldr := array.NewInt32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int32, size) + FillRandomInt32(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.UINT32: + bldr := array.NewUint32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint32, size) + FillRandomUint32(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.INT16: + bldr := array.NewInt16Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int16, size) + FillRandomInt16(0, 0, 64, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.UINT16: + bldr := array.NewUint16Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint16, size) + FillRandomUint16(0, 0, 64, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.INT8: + bldr := array.NewInt8Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int8, size) + FillRandomInt8(0, 0, 64, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.UINT8: + bldr := array.NewUint8Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]uint8, size) + FillRandomUint8(0, 0, 64, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() + case arrow.DATE32: + bldr := array.NewDate32Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int32, size) + FillRandomInt32Max(0, 24, values) + + dates := make([]arrow.Date32, size) + for idx, val := range values { + dates[idx] = arrow.Date32(val) * 86400000 + } + bldr.AppendValues(dates, nil) + return bldr.NewArray() + case arrow.DATE64: + bldr := array.NewDate64Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]int64, size) + FillRandomInt64Max(0, 24, values) + + dates := make([]arrow.Date64, size) + for idx, val := range values { + dates[idx] = arrow.Date64(val) * 86400000 + } + bldr.AppendValues(dates, nil) + return bldr.NewArray() + case arrow.STRING: + bldr := array.NewStringBuilder(memory.DefaultAllocator) + defer bldr.Release() + for i := 0; i < size; i++ { + bldr.Append("test-string") + } + return bldr.NewArray() + case arrow.BINARY: + bldr := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary) + defer bldr.Release() + + buf := make([]byte, 12) + r := rand.New(rand.NewSource(0)) + for i := 0; i < size; i++ { + length := r.Intn(12-2+1) + 2 + r.Read(buf[:length]) + bldr.Append(buf[:length]) + } + return bldr.NewArray() + case arrow.FIXED_SIZE_BINARY: + bldr := array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator, &arrow.FixedSizeBinaryType{ByteWidth: 10}) + defer bldr.Release() + + buf := make([]byte, 10) + r := rand.New(rand.NewSource(0)) + for i := 0; i < size; i++ { + r.Read(buf) + bldr.Append(buf) + } + return bldr.NewArray() + // case arrow.DECIMAL: Review comment: why commented out? ########## File path: go/parquet/internal/testutils/random.go ########## @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "math" + "time" + "unsafe" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/bitutil" + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/parquet" + + // "github.factset.com/mtopol/parquet-go/pqarrow" + "golang.org/x/exp/rand" + "gonum.org/v1/gonum/stat/distuv" +) + +type RandomArrayGenerator struct { + seed uint64 + extra uint64 + src rand.Source + seedRand *rand.Rand +} + +func NewRandomArrayGenerator(seed uint64) RandomArrayGenerator { + src := rand.NewSource(seed) + return RandomArrayGenerator{seed, 0, src, rand.New(src)} +} + +func (r *RandomArrayGenerator) GenerateBitmap(buffer []byte, n int64, prob float64) int64 { + count := int64(0) + r.extra++ + + dist := distuv.Bernoulli{P: prob, Src: rand.NewSource(r.seed + r.extra)} + for i := int(0); int64(i) < n; i++ { + if dist.Rand() != float64(0.0) { + bitutil.SetBit(buffer, i) + } else { + count++ + } + } + + return count +} + +func (r *RandomArrayGenerator) ByteArray(size int64, minLen, maxLen int32, nullProb float64) array.Interface { + if nullProb < 0 || nullProb > 1 { + panic("null prob must be between 0 and 1") + } + + lengths := r.Int32(size, minLen, maxLen, nullProb) + defer lengths.Release() + + r.extra++ + dist := rand.New(rand.NewSource(r.seed + r.extra)) + bldr := array.NewStringBuilder(memory.DefaultAllocator) + defer bldr.Release() + + strbuf := make([]byte, maxLen) + + for i := 0; int64(i) < size; i++ { + if lengths.IsValid(i) { + l := lengths.Value(i) + for j := int32(0); j < l; j++ { + strbuf[j] = byte(dist.Int31n(int32('z')-int32('A')+1) + int32('A')) + } + val := strbuf[:l] + bldr.Append(*(*string)(unsafe.Pointer(&val))) + } else { + bldr.AppendNull() + } + } + + return bldr.NewArray() +} + +func (r *RandomArrayGenerator) Uint8(size int64, min, max uint8, prob float64) array.Interface { + buffers := make([]*memory.Buffer, 2) + nullCount := int64(0) + + buffers[0] = memory.NewResizableBuffer(memory.DefaultAllocator) + buffers[0].Resize(int(bitutil.BytesForBits(size))) + nullCount = r.GenerateBitmap(buffers[0].Bytes(), size, prob) + + buffers[1] = memory.NewResizableBuffer(memory.DefaultAllocator) + buffers[1].Resize(int(size * int64(arrow.Uint8SizeBytes))) + + r.extra++ + dist := rand.New(rand.NewSource(r.seed + r.extra)) + out := arrow.Uint8Traits.CastFromBytes(buffers[1].Bytes()) + for i := int64(0); i < size; i++ { + out[i] = uint8(dist.Intn(int(max-min+1))) + min + } + + return array.NewUint8Data(array.NewData(arrow.PrimitiveTypes.Uint8, int(size), buffers, nil, int(nullCount), 0)) +} + +func (r *RandomArrayGenerator) Int32(size int64, min, max int32, pctNull float64) *array.Int32 { + buffers := make([]*memory.Buffer, 2) + nullCount := int64(0) + + buffers[0] = memory.NewResizableBuffer(memory.DefaultAllocator) + buffers[0].Resize(int(bitutil.BytesForBits(size))) + nullCount = r.GenerateBitmap(buffers[0].Bytes(), size, 1-pctNull) + + buffers[1] = memory.NewResizableBuffer(memory.DefaultAllocator) + buffers[1].Resize(arrow.Int32Traits.BytesRequired(int(size))) + + r.extra++ + dist := rand.New(rand.NewSource(r.seed + r.extra)) + out := arrow.Int32Traits.CastFromBytes(buffers[1].Bytes()) + for i := int64(0); i < size; i++ { + out[i] = dist.Int31n(max-min+1) + min + } + return array.NewInt32Data(array.NewData(arrow.PrimitiveTypes.Int32, int(size), buffers, nil, int(nullCount), 0)) +} + +func (r *RandomArrayGenerator) Float64(size int64, pctNull float64) *array.Float64 { Review comment: I see these are meant specifically for parquet? some docs on what they do at least at the top of the file summarizing methods here would be useful ########## File path: go/parquet/internal/utils/rle.go ########## @@ -0,0 +1,555 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "bytes" + "encoding/binary" + "io" + "math" + + "github.com/apache/arrow/go/arrow/bitutil" + "github.com/apache/arrow/go/parquet" + "golang.org/x/xerrors" +) + +//go:generate go run ../../../arrow/_tools/tmpl/main.go -i -data=physical_types.tmpldata typed_rle_dict.gen.go.tmpl + +const ( + MaxValuesPerLiteralRun = (1 << 6) * 8 +) + +func MinBufferSize(bitWidth int) int { + maxLiteralRunSize := 1 + bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth)) + maxRepeatedRunSize := binary.MaxVarintLen32 + bitutil.BytesForBits(int64(bitWidth)) + return int(Max(maxLiteralRunSize, maxRepeatedRunSize)) +} + +func MaxBufferSize(width, numValues int) int { + bytesPerRun := width + numRuns := int(bitutil.BytesForBits(int64(numValues))) + literalMaxSize := numRuns + (numRuns * bytesPerRun) + + minRepeatedRunSize := 1 + int(bitutil.BytesForBits(int64(width))) + repeatedMaxSize := int(bitutil.BytesForBits(int64(numValues))) * minRepeatedRunSize + + return MaxInt(literalMaxSize, repeatedMaxSize) +} + +// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs +// are sufficiently long, RLE is used, otherwise, the values are just bit-packed +// (literal encoding). +// For both types of runs, there is a byte-aligned indicator which encodes the length +// of the run and the type of the run. +// This encoding has the benefit that when there aren't any long enough runs, values +// are always decoded at fixed (can be precomputed) bit offsets OR both the value and +// the run length are byte aligned. This allows for very efficient decoding +// implementations. +// The encoding is: +// encoded-block := run* +// run := literal-run | repeated-run +// literal-run := literal-indicator < literal bytes > +// repeated-run := repeated-indicator < repeated value. padded to byte boundary > +// literal-indicator := varint_encode( number_of_groups << 1 | 1) +// repeated-indicator := varint_encode( number_of_repetitions << 1 ) +// +// Each run is preceded by a varint. The varint's least significant bit is +// used to indicate whether the run is a literal run or a repeated run. The rest +// of the varint is used to determine the length of the run (eg how many times the +// value repeats). +// +// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode +// in groups of 8), so that no matter the bit-width of the value, the sequence will end +// on a byte boundary without padding. +// Given that we know it is a multiple of 8, we store the number of 8-groups rather than +// the actual number of encoded ints. (This means that the total number of encoded values +// can not be determined from the encoded data, since the number of values in the last +// group may not be a multiple of 8). For the last group of literal runs, we pad +// the group to 8 with zeros. This allows for 8 at a time decoding on the read side +// without the need for additional checks. +// +// There is a break-even point when it is more storage efficient to do run length +// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes +// for both the repeated encoding or the literal encoding. This value can always +// be computed based on the bit-width. +// +// Examples with bit-width 1 (eg encoding booleans): +// ---------------------------------------- +// 100 1s followed by 100 0s: +// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> +// - (total 4 bytes) +// +// alternating 1s and 0s (200 total): +// 200 ints = 25 groups of 8 +// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> +// (total 26 bytes, 1 byte overhead) +// + +type RleDecoder struct { + r *BitReader + + buffered uint64 + bitWidth int + curVal uint64 + repCount int32 + litCount int32 +} + +func NewRleDecoder(data *bytes.Reader, width int) *RleDecoder { + return &RleDecoder{r: NewBitReader(data), bitWidth: width} +} + +func (r *RleDecoder) Reset(data *bytes.Reader, width int) { + r.bitWidth = width + r.curVal = 0 + r.repCount = 0 + r.litCount = 0 + r.r.Reset(data) +} + +func (r *RleDecoder) Next() bool { + indicator, ok := r.r.GetVlqInt() + if !ok { + return false + } + + literal := (indicator & 1) != 0 + count := uint32(indicator >> 1) + if literal { + if count == 0 || count > uint32(math.MaxInt32/8) { + return false + } + r.litCount = int32(count) * 8 + } else { + if count == 0 || count > uint32(math.MaxInt32) { + return false + } + r.repCount = int32(count) + + nbytes := int(bitutil.BytesForBits(int64(r.bitWidth))) + switch { + case nbytes > 4: + if !r.r.GetAligned(nbytes, &r.curVal) { + return false + } + case nbytes > 2: + var val uint32 + if !r.r.GetAligned(nbytes, &val) { + return false + } + r.curVal = uint64(val) + case nbytes > 1: + var val uint16 + if !r.r.GetAligned(nbytes, &val) { + return false + } + r.curVal = uint64(val) + default: + var val uint8 + if !r.r.GetAligned(nbytes, &val) { + return false + } + r.curVal = uint64(val) + } + } + return true +} + +func (r *RleDecoder) GetValue() (uint64, bool) { + vals := make([]uint64, 1) + n := r.GetBatch(vals) + return vals[0], n == 1 +} + +func (r *RleDecoder) GetBatch(values []uint64) int { + read := 0 + size := len(values) + + out := values + for read < size { + remain := size - read + + if r.repCount > 0 { + repbatch := int(math.Min(float64(remain), float64(r.repCount))) + for i := 0; i < repbatch; i++ { + out[i] = r.curVal + } + + r.repCount -= int32(repbatch) + read += repbatch + out = out[repbatch:] + } else if r.litCount > 0 { + litbatch := int(math.Min(float64(remain), float64(r.litCount))) + n, _ := r.r.GetBatch(uint(r.bitWidth), out[:litbatch]) + if n != litbatch { + return read + } + + r.litCount -= int32(litbatch) + read += litbatch + out = out[litbatch:] + } else { + if !r.Next() { + return read + } + } + } + return read +} + +func (r *RleDecoder) GetBatchSpaced(vals []uint64, nullcount int, validBits []byte, validBitsOffset int64) (int, error) { + if nullcount == 0 { + return r.GetBatch(vals), nil + } + + converter := plainConverter{} + blockCounter := NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals))) + + var ( + totalProcessed int + processed int + block BitBlockCount + err error + ) + + for { + block = blockCounter.NextFourWords() + if block.Len == 0 { + break + } + + if block.AllSet() { + processed = r.GetBatch(vals[:block.Len]) + } else if block.NoneSet() { + converter.FillZero(vals[:block.Len]) + processed = int(block.Len) + } else { + processed, err = r.getspaced(converter, vals, int(block.Len), int(block.Len-block.Popcnt), validBits, validBitsOffset) + if err != nil { + return totalProcessed, err + } + } + + totalProcessed += processed + vals = vals[int(block.Len):] + validBitsOffset += int64(block.Len) + + if processed != int(block.Len) { + break + } + } + return totalProcessed, nil +} + +func (r *RleDecoder) getspaced(dc DictionaryConverter, vals interface{}, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { + switch vals := vals.(type) { + case []int32: + return r.getspacedInt32(dc, vals, batchSize, nullCount, validBits, validBitsOffset) + case []int64: + return r.getspacedInt64(dc, vals, batchSize, nullCount, validBits, validBitsOffset) + case []float32: + return r.getspacedFloat32(dc, vals, batchSize, nullCount, validBits, validBitsOffset) + case []float64: + return r.getspacedFloat64(dc, vals, batchSize, nullCount, validBits, validBitsOffset) + case []parquet.ByteArray: + return r.getspacedByteArray(dc, vals, batchSize, nullCount, validBits, validBitsOffset) + case []parquet.FixedLenByteArray: + return r.getspacedFixedLenByteArray(dc, vals, batchSize, nullCount, validBits, validBitsOffset) + case []parquet.Int96: + return r.getspacedInt96(dc, vals, batchSize, nullCount, validBits, validBitsOffset) + case []uint64: + if nullCount == batchSize { + dc.FillZero(vals[:batchSize]) + return batchSize, nil + } + + read := 0 + remain := batchSize - nullCount + + const bufferSize = 1024 + var indexbuffer [bufferSize]IndexType + + // assume no bits to start + bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize)) + validRun := bitReader.NextRun() + for read < batchSize { Review comment: is it possible to break this up into different helper methods? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org