proost commented on code in PR #130:
URL: https://github.com/apache/datasketches-go/pull/130#discussion_r2876382691


##########
sampling/varopt_items_sketch_serde.go:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+
+       "github.com/apache/datasketches-go/internal"
+)
+
+const (
+       varOptPreambleLongsEmpty  = 1
+       varOptPreambleLongsWarmup = 3
+       varOptPreambleLongsFull   = 4
+
+       varOptSerVer     = 2
+       varOptFlagEmpty  = 0x04
+       varOptFlagGadget = 0x80
+)
+
+// ToSlice serializes the sketch to a byte slice using Java/C++ compatible 
preamble layout.
+func (s *VarOptItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
+       rfBits, err := resizeFactorBitsFor(s.rf)
+       if err != nil {
+               return nil, err
+       }
+
+       flags := byte(0)
+       if s.marks != nil {
+               flags |= varOptFlagGadget
+       }
+
+       preLongs := varOptPreambleLongsEmpty
+       totalItems := 0
+       if s.IsEmpty() {
+               flags |= varOptFlagEmpty
+       } else {
+               totalItems = s.h + s.r
+               if s.r == 0 {
+                       preLongs = varOptPreambleLongsWarmup
+               } else {
+                       preLongs = varOptPreambleLongsFull
+               }
+       }
+
+       weightsBytes := s.h * 8
+       markBytes := 0
+       if s.marks != nil {
+               markBytes = packedBoolBytes(s.h)
+       }
+
+       var items []T
+       if totalItems > 0 {
+               items = make([]T, 0, totalItems)
+               for i := 0; i < s.h; i++ {
+                       items = append(items, s.data[i])
+               }
+               for i := s.h + 1; i <= s.k && s.r > 0; i++ {
+                       items = append(items, s.data[i])
+               }
+       }
+       itemsBytes, err := serde.SerializeToBytes(items)
+       if err != nil {
+               return nil, err
+       }
+
+       preambleBytes := preLongs * 8
+       out := make([]byte, 
preambleBytes+weightsBytes+markBytes+len(itemsBytes))
+
+       out[0] = rfBits | byte(preLongs)
+       out[1] = varOptSerVer
+       out[2] = byte(internal.FamilyEnum.VarOptItems.Id)
+       out[3] = flags
+       binary.LittleEndian.PutUint32(out[4:], uint32(s.k))
+
+       if !s.IsEmpty() {
+               binary.LittleEndian.PutUint64(out[8:], uint64(s.n))
+               binary.LittleEndian.PutUint32(out[16:], uint32(s.h))
+               binary.LittleEndian.PutUint32(out[20:], uint32(s.r))
+               if s.r > 0 {
+                       binary.LittleEndian.PutUint64(out[24:], 
math.Float64bits(s.totalWeightR))
+               }
+       }
+
+       weightOffset := preambleBytes
+       if !s.IsEmpty() {
+               weightOffset = 24
+               if s.r > 0 {
+                       weightOffset += 8
+               }
+       }
+       for i := 0; i < s.h; i++ {
+               binary.LittleEndian.PutUint64(out[weightOffset+i*8:], 
math.Float64bits(s.weights[i]))
+       }
+
+       markOffset := weightOffset + weightsBytes
+       if s.marks != nil && s.h > 0 {
+               packBoolsInto(out[markOffset:markOffset+markBytes], 
s.marks[:s.h])
+       }
+
+       copy(out[markOffset+markBytes:], itemsBytes)
+       return out, nil
+}
+
+// NewVarOptItemsSketchFromSlice deserializes a sketch from bytes.
+func NewVarOptItemsSketchFromSlice[T any](data []byte, serde ItemsSerDe[T]) 
(*VarOptItemsSketch[T], error) {
+       if len(data) < 8 {
+               return nil, errors.New("data too short")
+       }
+
+       preLongs := int(data[0] & 0x3F)
+       rf, err := resizeFactorFromHeaderByte(data[0])
+       if err != nil {
+               return nil, err
+       }
+       ver := data[1]
+       family := data[2]
+       flags := data[3]
+       k := int(binary.LittleEndian.Uint32(data[4:]))
+
+       if ver != varOptSerVer {
+               return nil, errors.New("unsupported serialization version")
+       }
+       if family != byte(internal.FamilyEnum.VarOptItems.Id) {
+               return nil, errors.New("wrong sketch family")
+       }
+       if k < 1 || k > varOptMaxK {
+               return nil, errors.New("invalid k in serialized varopt sketch")
+       }
+
+       hasEmptyFlag := (flags & varOptFlagEmpty) != 0
+       if preLongs == varOptPreambleLongsEmpty && !hasEmptyFlag {
+               return nil, errors.New("invalid varopt sketch header: empty 
preLongs without empty flag")
+       }
+       if preLongs != varOptPreambleLongsEmpty && hasEmptyFlag {
+               return nil, errors.New("invalid varopt sketch header: non-empty 
preLongs with empty flag")
+       }
+
+       isEmpty := hasEmptyFlag
+       isGadget := (flags & varOptFlagGadget) != 0
+       if isEmpty {
+               out, err := NewVarOptItemsSketch[T](uint(k), 
WithResizeFactor(rf))
+               if err != nil {
+                       return nil, err
+               }
+               if isGadget {
+                       out.marks = make([]bool, 0, cap(out.data))
+               }
+               return out, nil
+       }
+
+       if preLongs != varOptPreambleLongsWarmup && preLongs != 
varOptPreambleLongsFull {
+               return nil, errors.New("invalid preLongs for non-empty varopt 
sketch")
+       }
+       if len(data) < preLongs*8 {
+               return nil, errors.New("data too short for varopt preamble")
+       }
+
+       n := int64(binary.LittleEndian.Uint64(data[8:]))
+       h := int(binary.LittleEndian.Uint32(data[16:]))
+       r := int(binary.LittleEndian.Uint32(data[20:]))
+       if h < 0 || r < 0 {
+               return nil, errors.New("invalid h/r in serialized varopt 
sketch")
+       }
+       if r > 0 && h+r != k {
+               return nil, errors.New("invalid varopt sketch state: h + r must 
equal k in sampling mode")
+       }
+       if r == 0 && h > k {
+               return nil, errors.New("invalid varopt sketch state: h exceeds 
k in warmup mode")
+       }
+
+       totalWeightR := 0.0
+       if r > 0 {
+               totalWeightR = 
math.Float64frombits(binary.LittleEndian.Uint64(data[24:]))
+       }

Review Comment:
   we need to invalidate that preLong is 4, but r == 0



##########
sampling/varopt_items_sketch_serde.go:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+
+       "github.com/apache/datasketches-go/internal"
+)
+
+const (
+       varOptPreambleLongsEmpty  = 1
+       varOptPreambleLongsWarmup = 3
+       varOptPreambleLongsFull   = 4
+
+       varOptSerVer     = 2
+       varOptFlagEmpty  = 0x04
+       varOptFlagGadget = 0x80
+)
+
+// ToSlice serializes the sketch to a byte slice using Java/C++ compatible 
preamble layout.
+func (s *VarOptItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
+       rfBits, err := resizeFactorBitsFor(s.rf)
+       if err != nil {
+               return nil, err
+       }
+
+       flags := byte(0)
+       if s.marks != nil {
+               flags |= varOptFlagGadget
+       }
+
+       preLongs := varOptPreambleLongsEmpty
+       totalItems := 0
+       if s.IsEmpty() {
+               flags |= varOptFlagEmpty
+       } else {
+               totalItems = s.h + s.r
+               if s.r == 0 {
+                       preLongs = varOptPreambleLongsWarmup
+               } else {
+                       preLongs = varOptPreambleLongsFull
+               }
+       }
+
+       weightsBytes := s.h * 8
+       markBytes := 0
+       if s.marks != nil {
+               markBytes = packedBoolBytes(s.h)
+       }
+
+       var items []T
+       if totalItems > 0 {
+               items = make([]T, 0, totalItems)
+               for i := 0; i < s.h; i++ {
+                       items = append(items, s.data[i])
+               }
+               for i := s.h + 1; i <= s.k && s.r > 0; i++ {
+                       items = append(items, s.data[i])
+               }
+       }
+       itemsBytes, err := serde.SerializeToBytes(items)
+       if err != nil {
+               return nil, err
+       }
+
+       preambleBytes := preLongs * 8
+       out := make([]byte, 
preambleBytes+weightsBytes+markBytes+len(itemsBytes))
+
+       out[0] = rfBits | byte(preLongs)
+       out[1] = varOptSerVer
+       out[2] = byte(internal.FamilyEnum.VarOptItems.Id)
+       out[3] = flags
+       binary.LittleEndian.PutUint32(out[4:], uint32(s.k))
+
+       if !s.IsEmpty() {
+               binary.LittleEndian.PutUint64(out[8:], uint64(s.n))
+               binary.LittleEndian.PutUint32(out[16:], uint32(s.h))
+               binary.LittleEndian.PutUint32(out[20:], uint32(s.r))
+               if s.r > 0 {
+                       binary.LittleEndian.PutUint64(out[24:], 
math.Float64bits(s.totalWeightR))
+               }
+       }
+
+       weightOffset := preambleBytes
+       if !s.IsEmpty() {
+               weightOffset = 24
+               if s.r > 0 {
+                       weightOffset += 8
+               }
+       }
+       for i := 0; i < s.h; i++ {
+               binary.LittleEndian.PutUint64(out[weightOffset+i*8:], 
math.Float64bits(s.weights[i]))
+       }
+
+       markOffset := weightOffset + weightsBytes
+       if s.marks != nil && s.h > 0 {
+               packBoolsInto(out[markOffset:markOffset+markBytes], 
s.marks[:s.h])
+       }
+
+       copy(out[markOffset+markBytes:], itemsBytes)
+       return out, nil
+}
+
+// NewVarOptItemsSketchFromSlice deserializes a sketch from bytes.
+func NewVarOptItemsSketchFromSlice[T any](data []byte, serde ItemsSerDe[T]) 
(*VarOptItemsSketch[T], error) {
+       if len(data) < 8 {
+               return nil, errors.New("data too short")
+       }
+
+       preLongs := int(data[0] & 0x3F)
+       rf, err := resizeFactorFromHeaderByte(data[0])
+       if err != nil {
+               return nil, err
+       }
+       ver := data[1]
+       family := data[2]
+       flags := data[3]
+       k := int(binary.LittleEndian.Uint32(data[4:]))
+
+       if ver != varOptSerVer {
+               return nil, errors.New("unsupported serialization version")
+       }
+       if family != byte(internal.FamilyEnum.VarOptItems.Id) {
+               return nil, errors.New("wrong sketch family")
+       }
+       if k < 1 || k > varOptMaxK {
+               return nil, errors.New("invalid k in serialized varopt sketch")
+       }
+
+       hasEmptyFlag := (flags & varOptFlagEmpty) != 0
+       if preLongs == varOptPreambleLongsEmpty && !hasEmptyFlag {
+               return nil, errors.New("invalid varopt sketch header: empty 
preLongs without empty flag")
+       }
+       if preLongs != varOptPreambleLongsEmpty && hasEmptyFlag {
+               return nil, errors.New("invalid varopt sketch header: non-empty 
preLongs with empty flag")
+       }
+
+       isEmpty := hasEmptyFlag
+       isGadget := (flags & varOptFlagGadget) != 0
+       if isEmpty {
+               out, err := NewVarOptItemsSketch[T](uint(k), 
WithResizeFactor(rf))
+               if err != nil {
+                       return nil, err
+               }
+               if isGadget {
+                       out.marks = make([]bool, 0, cap(out.data))
+               }
+               return out, nil
+       }
+
+       if preLongs != varOptPreambleLongsWarmup && preLongs != 
varOptPreambleLongsFull {
+               return nil, errors.New("invalid preLongs for non-empty varopt 
sketch")
+       }
+       if len(data) < preLongs*8 {
+               return nil, errors.New("data too short for varopt preamble")
+       }
+
+       n := int64(binary.LittleEndian.Uint64(data[8:]))
+       h := int(binary.LittleEndian.Uint32(data[16:]))
+       r := int(binary.LittleEndian.Uint32(data[20:]))
+       if h < 0 || r < 0 {
+               return nil, errors.New("invalid h/r in serialized varopt 
sketch")
+       }
+       if r > 0 && h+r != k {
+               return nil, errors.New("invalid varopt sketch state: h + r must 
equal k in sampling mode")
+       }
+       if r == 0 && h > k {
+               return nil, errors.New("invalid varopt sketch state: h exceeds 
k in warmup mode")
+       }
+
+       totalWeightR := 0.0
+       if r > 0 {
+               totalWeightR = 
math.Float64frombits(binary.LittleEndian.Uint64(data[24:]))
+       }
+
+       weightOffset := 24
+       if r > 0 {
+               weightOffset += 8
+       }
+       weightsBytes := h * 8
+       if len(data) < weightOffset+weightsBytes {
+               return nil, errors.New("data too short for varopt weights")
+       }
+
+       hWeights := make([]float64, h)
+       for i := 0; i < h; i++ {
+               w := 
math.Float64frombits(binary.LittleEndian.Uint64(data[weightOffset+i*8:]))
+               if w <= 0 || math.IsNaN(w) || math.IsInf(w, 0) {
+                       return nil, errors.New("invalid non-positive or 
non-finite weight in serialized varopt sketch")
+               }
+               hWeights[i] = w
+       }
+
+       markOffset := weightOffset + weightsBytes
+       hMarks := make([]bool, h)
+       numMarksInH := uint32(0)
+       if isGadget && h > 0 {
+               markBytes := packedBoolBytes(h)
+               if len(data) < markOffset+markBytes {
+                       return nil, errors.New("data too short for varopt 
marks")
+               }
+               unpackBoolsFrom(data[markOffset:markOffset+markBytes], hMarks)
+               for _, m := range hMarks {
+                       if m {
+                               numMarksInH++
+                       }
+               }
+               markOffset += markBytes
+       }
+
+       totalItems := h + r
+       items, err := serde.DeserializeFromBytes(data[markOffset:], totalItems)
+       if err != nil {
+               return nil, err
+       }
+
+       if r == 0 {
+               out := &VarOptItemsSketch[T]{

Review Comment:
   we need to set capacities of each slice. becasue we use cap to determine 
grow or not.



##########
sampling/varopt_items_sketch_serde.go:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+
+       "github.com/apache/datasketches-go/internal"
+)
+
+const (
+       varOptPreambleLongsEmpty  = 1
+       varOptPreambleLongsWarmup = 3
+       varOptPreambleLongsFull   = 4
+
+       varOptSerVer     = 2
+       varOptFlagEmpty  = 0x04
+       varOptFlagGadget = 0x80
+)
+
+// ToSlice serializes the sketch to a byte slice using Java/C++ compatible 
preamble layout.
+func (s *VarOptItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
+       rfBits, err := resizeFactorBitsFor(s.rf)
+       if err != nil {
+               return nil, err
+       }
+
+       flags := byte(0)
+       if s.marks != nil {
+               flags |= varOptFlagGadget
+       }
+
+       preLongs := varOptPreambleLongsEmpty
+       totalItems := 0
+       if s.IsEmpty() {
+               flags |= varOptFlagEmpty
+       } else {
+               totalItems = s.h + s.r
+               if s.r == 0 {
+                       preLongs = varOptPreambleLongsWarmup
+               } else {
+                       preLongs = varOptPreambleLongsFull
+               }
+       }
+
+       weightsBytes := s.h * 8
+       markBytes := 0
+       if s.marks != nil {
+               markBytes = packedBoolBytes(s.h)
+       }
+
+       var items []T
+       if totalItems > 0 {
+               items = make([]T, 0, totalItems)
+               for i := 0; i < s.h; i++ {
+                       items = append(items, s.data[i])
+               }
+               for i := s.h + 1; i <= s.k && s.r > 0; i++ {
+                       items = append(items, s.data[i])
+               }
+       }
+       itemsBytes, err := serde.SerializeToBytes(items)
+       if err != nil {
+               return nil, err
+       }
+
+       preambleBytes := preLongs * 8
+       out := make([]byte, 
preambleBytes+weightsBytes+markBytes+len(itemsBytes))
+
+       out[0] = rfBits | byte(preLongs)
+       out[1] = varOptSerVer
+       out[2] = byte(internal.FamilyEnum.VarOptItems.Id)
+       out[3] = flags
+       binary.LittleEndian.PutUint32(out[4:], uint32(s.k))
+
+       if !s.IsEmpty() {
+               binary.LittleEndian.PutUint64(out[8:], uint64(s.n))
+               binary.LittleEndian.PutUint32(out[16:], uint32(s.h))
+               binary.LittleEndian.PutUint32(out[20:], uint32(s.r))
+               if s.r > 0 {
+                       binary.LittleEndian.PutUint64(out[24:], 
math.Float64bits(s.totalWeightR))
+               }
+       }
+
+       weightOffset := preambleBytes
+       if !s.IsEmpty() {
+               weightOffset = 24
+               if s.r > 0 {
+                       weightOffset += 8
+               }
+       }
+       for i := 0; i < s.h; i++ {
+               binary.LittleEndian.PutUint64(out[weightOffset+i*8:], 
math.Float64bits(s.weights[i]))
+       }
+
+       markOffset := weightOffset + weightsBytes
+       if s.marks != nil && s.h > 0 {
+               packBoolsInto(out[markOffset:markOffset+markBytes], 
s.marks[:s.h])
+       }
+
+       copy(out[markOffset+markBytes:], itemsBytes)

Review Comment:
   can you check not empty before write "itemsBytes"? If an user has custom 
"ItemsSerDe", it is easy to make corrupted sketch



##########
sampling/varopt_items_union.go:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+       "math/rand"
+
+       "github.com/apache/datasketches-go/internal"
+)
+
+// VarOptItemsUnion provides union operations over VarOpt sketches.
+//
+// This file implements the phase-1 core API used by callers to merge sketches.
+// Full gadget-resolution logic for marked items in H is handled in a 
follow-up step.
+type VarOptItemsUnion[T any] struct {
+       gadget *VarOptItemsSketch[T]
+       maxK   int
+
+       n int64
+
+       // outer tau is the largest tau of any input sketch in estimation mode
+       outerTauNumer float64
+       outerTauDenom int64
+}
+
+func NewVarOptItemsUnion[T any](maxK int) (*VarOptItemsUnion[T], error) {
+       if maxK < 1 || maxK > varOptMaxK {
+               return nil, errors.New("maxK must be at least 1 and less than 
2^31 - 1")
+       }
+
+       gadget, err := newVarOptItemsSketchAsGadget[T](maxK)
+       if err != nil {
+               return nil, err
+       }
+
+       return &VarOptItemsUnion[T]{
+               gadget: gadget,
+               maxK:   maxK,
+       }, nil
+}
+
+func (u *VarOptItemsUnion[T]) MaxK() int {
+       return u.maxK
+}
+
+func (u *VarOptItemsUnion[T]) Reset() error {
+       gadget, err := newVarOptItemsSketchAsGadget[T](u.maxK)

Review Comment:
   Why don't you just use "Reset" method in Sketch?



##########
sampling/varopt_items_union.go:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+       "math/rand"
+
+       "github.com/apache/datasketches-go/internal"
+)
+
+// VarOptItemsUnion provides union operations over VarOpt sketches.
+//
+// This file implements the phase-1 core API used by callers to merge sketches.
+// Full gadget-resolution logic for marked items in H is handled in a 
follow-up step.
+type VarOptItemsUnion[T any] struct {
+       gadget *VarOptItemsSketch[T]
+       maxK   int
+
+       n int64
+
+       // outer tau is the largest tau of any input sketch in estimation mode
+       outerTauNumer float64
+       outerTauDenom int64
+}
+
+func NewVarOptItemsUnion[T any](maxK int) (*VarOptItemsUnion[T], error) {
+       if maxK < 1 || maxK > varOptMaxK {

Review Comment:
   This line is duplicated. VarOptItemsSketch constructor covers it in L49.



##########
sampling/varopt_items_union_test.go:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestNewVarOptItemsUnion(t *testing.T) {
+       _, err := NewVarOptItemsUnion[int](0)
+       assert.ErrorContains(t, err, "maxK must be at least 1")
+
+       union, err := NewVarOptItemsUnion[int](16)
+       assert.NoError(t, err)
+       assert.Equal(t, 16, union.MaxK())
+}
+
+func TestVarOptItemsUnion_ResultEmpty(t *testing.T) {
+       union, err := NewVarOptItemsUnion[int](8)
+       assert.NoError(t, err)
+
+       result, err := union.Result()
+       assert.NoError(t, err)
+       assert.Equal(t, 8, result.K())
+       assert.Equal(t, int64(0), result.N())
+       assert.True(t, result.IsEmpty())
+}
+
+func TestVarOptItemsUnion_UpdateSketchExactMode(t *testing.T) {

Review Comment:
   We need more test cases for update method.
   
   1. union sampling sketches which one of them has extreme heavy item.
   2. union identical sampling sketches.
   3. union with diffent k and weighted items.



##########
sampling/varopt_items_union.go:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+       "math/rand"
+
+       "github.com/apache/datasketches-go/internal"
+)
+
+// VarOptItemsUnion provides union operations over VarOpt sketches.
+//
+// This file implements the phase-1 core API used by callers to merge sketches.
+// Full gadget-resolution logic for marked items in H is handled in a 
follow-up step.
+type VarOptItemsUnion[T any] struct {
+       gadget *VarOptItemsSketch[T]
+       maxK   int
+
+       n int64
+
+       // outer tau is the largest tau of any input sketch in estimation mode
+       outerTauNumer float64
+       outerTauDenom int64
+}
+
+func NewVarOptItemsUnion[T any](maxK int) (*VarOptItemsUnion[T], error) {
+       if maxK < 1 || maxK > varOptMaxK {
+               return nil, errors.New("maxK must be at least 1 and less than 
2^31 - 1")
+       }
+
+       gadget, err := newVarOptItemsSketchAsGadget[T](maxK)
+       if err != nil {
+               return nil, err
+       }
+
+       return &VarOptItemsUnion[T]{
+               gadget: gadget,
+               maxK:   maxK,
+       }, nil
+}
+
+func (u *VarOptItemsUnion[T]) MaxK() int {

Review Comment:
   User knows what maxK is. What is purpose on this method?



##########
sampling/varopt_items_sketch_serde.go:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+
+       "github.com/apache/datasketches-go/internal"
+)
+
+const (
+       varOptPreambleLongsEmpty  = 1
+       varOptPreambleLongsWarmup = 3
+       varOptPreambleLongsFull   = 4
+
+       varOptSerVer     = 2
+       varOptFlagEmpty  = 0x04
+       varOptFlagGadget = 0x80
+)
+
+// ToSlice serializes the sketch to a byte slice using Java/C++ compatible 
preamble layout.
+func (s *VarOptItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
+       rfBits, err := resizeFactorBitsFor(s.rf)
+       if err != nil {
+               return nil, err
+       }
+
+       flags := byte(0)
+       if s.marks != nil {
+               flags |= varOptFlagGadget
+       }
+
+       preLongs := varOptPreambleLongsEmpty
+       totalItems := 0
+       if s.IsEmpty() {
+               flags |= varOptFlagEmpty
+       } else {
+               totalItems = s.h + s.r
+               if s.r == 0 {
+                       preLongs = varOptPreambleLongsWarmup
+               } else {
+                       preLongs = varOptPreambleLongsFull
+               }
+       }
+
+       weightsBytes := s.h * 8
+       markBytes := 0
+       if s.marks != nil {
+               markBytes = packedBoolBytes(s.h)
+       }
+
+       var items []T
+       if totalItems > 0 {
+               items = make([]T, 0, totalItems)
+               for i := 0; i < s.h; i++ {
+                       items = append(items, s.data[i])
+               }
+               for i := s.h + 1; i <= s.k && s.r > 0; i++ {
+                       items = append(items, s.data[i])
+               }
+       }
+       itemsBytes, err := serde.SerializeToBytes(items)
+       if err != nil {
+               return nil, err
+       }
+
+       preambleBytes := preLongs * 8
+       out := make([]byte, 
preambleBytes+weightsBytes+markBytes+len(itemsBytes))
+
+       out[0] = rfBits | byte(preLongs)
+       out[1] = varOptSerVer
+       out[2] = byte(internal.FamilyEnum.VarOptItems.Id)
+       out[3] = flags
+       binary.LittleEndian.PutUint32(out[4:], uint32(s.k))
+
+       if !s.IsEmpty() {
+               binary.LittleEndian.PutUint64(out[8:], uint64(s.n))
+               binary.LittleEndian.PutUint32(out[16:], uint32(s.h))
+               binary.LittleEndian.PutUint32(out[20:], uint32(s.r))
+               if s.r > 0 {
+                       binary.LittleEndian.PutUint64(out[24:], 
math.Float64bits(s.totalWeightR))
+               }
+       }
+
+       weightOffset := preambleBytes
+       if !s.IsEmpty() {
+               weightOffset = 24
+               if s.r > 0 {
+                       weightOffset += 8
+               }
+       }
+       for i := 0; i < s.h; i++ {
+               binary.LittleEndian.PutUint64(out[weightOffset+i*8:], 
math.Float64bits(s.weights[i]))
+       }
+
+       markOffset := weightOffset + weightsBytes
+       if s.marks != nil && s.h > 0 {
+               packBoolsInto(out[markOffset:markOffset+markBytes], 
s.marks[:s.h])
+       }
+
+       copy(out[markOffset+markBytes:], itemsBytes)
+       return out, nil
+}
+
+// NewVarOptItemsSketchFromSlice deserializes a sketch from bytes.
+func NewVarOptItemsSketchFromSlice[T any](data []byte, serde ItemsSerDe[T]) 
(*VarOptItemsSketch[T], error) {

Review Comment:
   We need more stricter validation on prelong, h, n, r, k



##########
sampling/varopt_items_sketch_serde.go:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+
+       "github.com/apache/datasketches-go/internal"
+)
+
+const (
+       varOptPreambleLongsEmpty  = 1
+       varOptPreambleLongsWarmup = 3
+       varOptPreambleLongsFull   = 4
+
+       varOptSerVer     = 2
+       varOptFlagEmpty  = 0x04
+       varOptFlagGadget = 0x80
+)
+
+// ToSlice serializes the sketch to a byte slice using Java/C++ compatible 
preamble layout.
+func (s *VarOptItemsSketch[T]) ToSlice(serde ItemsSerDe[T]) ([]byte, error) {
+       rfBits, err := resizeFactorBitsFor(s.rf)
+       if err != nil {
+               return nil, err
+       }
+
+       flags := byte(0)
+       if s.marks != nil {
+               flags |= varOptFlagGadget
+       }
+
+       preLongs := varOptPreambleLongsEmpty
+       totalItems := 0
+       if s.IsEmpty() {
+               flags |= varOptFlagEmpty
+       } else {
+               totalItems = s.h + s.r
+               if s.r == 0 {
+                       preLongs = varOptPreambleLongsWarmup
+               } else {
+                       preLongs = varOptPreambleLongsFull
+               }
+       }
+
+       weightsBytes := s.h * 8
+       markBytes := 0
+       if s.marks != nil {
+               markBytes = packedBoolBytes(s.h)
+       }
+
+       var items []T
+       if totalItems > 0 {
+               items = make([]T, 0, totalItems)
+               for i := 0; i < s.h; i++ {
+                       items = append(items, s.data[i])
+               }
+               for i := s.h + 1; i <= s.k && s.r > 0; i++ {
+                       items = append(items, s.data[i])
+               }
+       }
+       itemsBytes, err := serde.SerializeToBytes(items)
+       if err != nil {
+               return nil, err
+       }
+
+       preambleBytes := preLongs * 8
+       out := make([]byte, 
preambleBytes+weightsBytes+markBytes+len(itemsBytes))
+
+       out[0] = rfBits | byte(preLongs)
+       out[1] = varOptSerVer
+       out[2] = byte(internal.FamilyEnum.VarOptItems.Id)
+       out[3] = flags
+       binary.LittleEndian.PutUint32(out[4:], uint32(s.k))
+
+       if !s.IsEmpty() {
+               binary.LittleEndian.PutUint64(out[8:], uint64(s.n))
+               binary.LittleEndian.PutUint32(out[16:], uint32(s.h))
+               binary.LittleEndian.PutUint32(out[20:], uint32(s.r))
+               if s.r > 0 {
+                       binary.LittleEndian.PutUint64(out[24:], 
math.Float64bits(s.totalWeightR))
+               }
+       }
+
+       weightOffset := preambleBytes
+       if !s.IsEmpty() {
+               weightOffset = 24
+               if s.r > 0 {
+                       weightOffset += 8
+               }
+       }
+       for i := 0; i < s.h; i++ {
+               binary.LittleEndian.PutUint64(out[weightOffset+i*8:], 
math.Float64bits(s.weights[i]))
+       }
+
+       markOffset := weightOffset + weightsBytes
+       if s.marks != nil && s.h > 0 {
+               packBoolsInto(out[markOffset:markOffset+markBytes], 
s.marks[:s.h])
+       }
+
+       copy(out[markOffset+markBytes:], itemsBytes)
+       return out, nil
+}
+
+// NewVarOptItemsSketchFromSlice deserializes a sketch from bytes.
+func NewVarOptItemsSketchFromSlice[T any](data []byte, serde ItemsSerDe[T]) 
(*VarOptItemsSketch[T], error) {
+       if len(data) < 8 {
+               return nil, errors.New("data too short")
+       }
+
+       preLongs := int(data[0] & 0x3F)
+       rf, err := resizeFactorFromHeaderByte(data[0])
+       if err != nil {
+               return nil, err
+       }
+       ver := data[1]
+       family := data[2]
+       flags := data[3]
+       k := int(binary.LittleEndian.Uint32(data[4:]))
+
+       if ver != varOptSerVer {
+               return nil, errors.New("unsupported serialization version")
+       }
+       if family != byte(internal.FamilyEnum.VarOptItems.Id) {
+               return nil, errors.New("wrong sketch family")
+       }
+       if k < 1 || k > varOptMaxK {
+               return nil, errors.New("invalid k in serialized varopt sketch")
+       }
+
+       hasEmptyFlag := (flags & varOptFlagEmpty) != 0
+       if preLongs == varOptPreambleLongsEmpty && !hasEmptyFlag {
+               return nil, errors.New("invalid varopt sketch header: empty 
preLongs without empty flag")
+       }
+       if preLongs != varOptPreambleLongsEmpty && hasEmptyFlag {
+               return nil, errors.New("invalid varopt sketch header: non-empty 
preLongs with empty flag")
+       }
+
+       isEmpty := hasEmptyFlag
+       isGadget := (flags & varOptFlagGadget) != 0
+       if isEmpty {
+               out, err := NewVarOptItemsSketch[T](uint(k), 
WithResizeFactor(rf))
+               if err != nil {
+                       return nil, err
+               }
+               if isGadget {
+                       out.marks = make([]bool, 0, cap(out.data))
+               }
+               return out, nil
+       }
+
+       if preLongs != varOptPreambleLongsWarmup && preLongs != 
varOptPreambleLongsFull {
+               return nil, errors.New("invalid preLongs for non-empty varopt 
sketch")
+       }
+       if len(data) < preLongs*8 {
+               return nil, errors.New("data too short for varopt preamble")
+       }
+
+       n := int64(binary.LittleEndian.Uint64(data[8:]))
+       h := int(binary.LittleEndian.Uint32(data[16:]))
+       r := int(binary.LittleEndian.Uint32(data[20:]))
+       if h < 0 || r < 0 {
+               return nil, errors.New("invalid h/r in serialized varopt 
sketch")
+       }
+       if r > 0 && h+r != k {
+               return nil, errors.New("invalid varopt sketch state: h + r must 
equal k in sampling mode")
+       }
+       if r == 0 && h > k {
+               return nil, errors.New("invalid varopt sketch state: h exceeds 
k in warmup mode")
+       }
+
+       totalWeightR := 0.0
+       if r > 0 {
+               totalWeightR = 
math.Float64frombits(binary.LittleEndian.Uint64(data[24:]))
+       }

Review Comment:
   Also can you add check `totalWeightR` is NaN?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to