proost commented on code in PR #102:
URL: https://github.com/apache/datasketches-go/pull/102#discussion_r2696742470


##########
sampling/varopt_items_sketch.go:
##########
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sampling
+
+import (
+       "errors"
+       "iter"
+       "math"
+       "math/rand"
+)
+
+// VarOptItemsSketch implements variance-optimal weighted sampling.
+//
+// This sketch samples weighted items from a stream with optimal variance
+// for subset sum estimation. The algorithm maintains two regions:
+//   - H region (heavy): Items with weight >= tau (stored in a min-heap)
+//   - R region (reservoir): Items with weight < tau (sampled proportionally)
+//
+// The array layout is: [H region: 0..h) [gap/M region: h..h+m) [R region: 
h+m..h+m+r)
+// In steady state, m=0 and h+r=k. The gap slot at position h is used during 
updates.
+//
+// When all weights are equal (e.g., 1.0), this reduces to standard reservoir 
sampling.
+//
+// Reference: Cohen et al., "Efficient Stream Sampling for Variance-Optimal
+// Estimation of Subset Sums", SIAM J. Comput. 40(5): 1402-1431, 2011.
+type VarOptItemsSketch[T any] struct {
+       k            int       // maximum sample size (user-configured)
+       n            int64     // total number of items processed
+       h            int       // number of items in H (heavy/heap) region
+       m            int       // number of items in middle region (during 
candidate set operations)
+       r            int       // number of items in R (reservoir) region
+       totalWeightR float64   // total weight of items in R region
+       data         []T       // stored items
+       weights      []float64 // corresponding weights for each item (-1.0 
indicates R region)
+
+       // resize factor for array growth
+       rf ResizeFactor
+
+       // current allocated capacity
+       allocatedSize int
+}
+
+const (
+       // VarOpt specific constants
+       varOptDefaultResizeFactor = ResizeX8
+       varOptMinLgK              = 3 // minimum log2(k) = 3, so minimum k = 8
+       varOptMinK                = 1 << varOptMinLgK
+       varOptMaxK                = (1 << 31) - 2 // maximum k value
+)
+
+// NewVarOptItemsSketch creates a new VarOpt sketch with the specified maximum 
sample size k.
+func NewVarOptItemsSketch[T any](k int) (*VarOptItemsSketch[T], error) {
+       return NewVarOptItemsSketchWithResizeFactor[T](k, 
varOptDefaultResizeFactor)
+}
+
+// NewVarOptItemsSketchWithResizeFactor creates a new VarOpt sketch with 
specified k and resize factor.
+func NewVarOptItemsSketchWithResizeFactor[T any](k int, rf ResizeFactor) 
(*VarOptItemsSketch[T], error) {
+       if k < varOptMinK {
+               return nil, errors.New("k must be at least 8")
+       }
+       if k > varOptMaxK {
+               return nil, errors.New("k is too large")
+       }
+
+       initialSize := int(rf)
+       if initialSize > k {
+               initialSize = k
+       }
+
+       return &VarOptItemsSketch[T]{
+               k:             k,
+               n:             0,
+               h:             0,
+               m:             0,
+               r:             0,
+               totalWeightR:  0.0,
+               data:          make([]T, 0, initialSize),
+               weights:       make([]float64, 0, initialSize),
+               rf:            rf,
+               allocatedSize: initialSize,
+       }, nil
+}
+
+// K returns the configured maximum sample size.
+func (s *VarOptItemsSketch[T]) K() int { return s.k }
+
+// N returns the total number of items processed by the sketch.
+func (s *VarOptItemsSketch[T]) N() int64 { return s.n }
+
+// NumSamples returns the number of items currently retained in the sketch.
+func (s *VarOptItemsSketch[T]) NumSamples() int { return s.h + s.r }
+
+// IsEmpty returns true if the sketch has not processed any items.
+func (s *VarOptItemsSketch[T]) IsEmpty() bool { return s.n == 0 }
+
+// Reset clears the sketch to its initial empty state while preserving k.
+func (s *VarOptItemsSketch[T]) Reset() {
+       s.n = 0
+       s.h = 0
+       s.m = 0
+       s.r = 0
+       s.totalWeightR = 0.0
+       s.data = s.data[:0]
+       s.weights = s.weights[:0]
+}
+
+// H returns the number of items in the H (heavy) region.
+func (s *VarOptItemsSketch[T]) H() int { return s.h }
+
+// R returns the number of items in the R (reservoir) region.
+func (s *VarOptItemsSketch[T]) R() int { return s.r }
+
+// TotalWeightR returns the total weight of items in the R region.
+func (s *VarOptItemsSketch[T]) TotalWeightR() float64 { return s.totalWeightR }
+
+// Samples returns an iterator over the samples and their adjusted weights.
+// For items in H region, the weight is the original weight.
+// For items in R region, the weight is tau (totalWeightR / r).
+//
+// Usage:
+//
+//     for item, weight := range sketch.Samples() {
+//         // process item and weight
+//     }
+func (s *VarOptItemsSketch[T]) Samples() iter.Seq2[T, float64] {
+       return func(yield func(T, float64) bool) {
+               // H region: items with their actual weights
+               for i := 0; i < s.h; i++ {
+                       if !yield(s.data[i], s.weights[i]) {
+                               return
+                       }
+               }
+
+               // R region: items with weight = tau
+               if s.r > 0 {
+                       tau := s.totalWeightR / float64(s.r)
+                       rStart := s.h + s.m
+                       for i := 0; i < s.r; i++ {
+                               if !yield(s.data[rStart+i], tau) {
+                                       return
+                               }
+                       }
+               }
+       }
+}
+
+// inWarmup returns true if the sketch is still in warmup phase (exact mode).
+// During warmup, r=0 and we store all items directly in H.
+func (s *VarOptItemsSketch[T]) inWarmup() bool {
+       return s.r == 0
+}
+
+// tau returns the current threshold value tau.
+// Items with weight > tau are "heavy" and go into H region.
+func (s *VarOptItemsSketch[T]) tau() float64 {
+       if s.r == 0 {
+               return math.NaN()
+       }
+       return s.totalWeightR / float64(s.r)
+}
+
+// peekMin returns the minimum weight in the H region (heap root).
+func (s *VarOptItemsSketch[T]) peekMin() float64 {
+       if s.h == 0 {
+               return math.Inf(1)
+       }
+       return s.weights[0]
+}
+
+// Update adds an item with the given weight to the sketch.
+// Weight must be positive and finite.
+func (s *VarOptItemsSketch[T]) Update(item T, weight float64) error {
+       if weight < 0 || math.IsNaN(weight) || math.IsInf(weight, 0) {
+               return errors.New("weight must be nonnegative and finite")
+       }
+       if weight == 0 {
+               return nil // ignore zero weight items
+       }
+
+       s.n++
+
+       if s.r == 0 {
+               // exact mode (warmup)
+               s.updateWarmupPhase(item, weight)
+       } else {
+               // estimation mode
+               // what tau would be if deletion candidates = R + new item
+               hypotheticalTau := (weight + s.totalWeightR) / float64(s.r) // 
r+1-1 = r
+
+               // is new item's turn to be considered for reservoir?
+               condition1 := s.h == 0 || weight <= s.peekMin()
+               // is new item light enough for reservoir?
+               condition2 := weight < hypotheticalTau
+
+               if condition1 && condition2 {
+                       s.updateLight(item, weight)
+               } else if s.r == 1 {
+                       s.updateHeavyREq1(item, weight)
+               } else {
+                       s.updateHeavyGeneral(item, weight)
+               }
+       }
+       return nil
+}
+
+// updateWarmupPhase handles the warmup phase when r=0.
+func (s *VarOptItemsSketch[T]) updateWarmupPhase(item T, weight float64) {
+       if s.h >= len(s.data) {
+               s.growDataArrays()
+       }
+
+       // store items as they come in
+       if s.h < len(s.data) {
+               s.data[s.h] = item
+               s.weights[s.h] = weight
+       } else {
+               s.data = append(s.data, item)
+               s.weights = append(s.weights, weight)
+       }
+       s.h++
+
+       // check if need to transition to estimation mode
+       if s.h > s.k {
+               s.transitionFromWarmup()
+       }
+}
+
+// transitionFromWarmup converts from warmup (exact) mode to estimation mode.
+func (s *VarOptItemsSketch[T]) transitionFromWarmup() {
+       // Convert to heap and move 2 lightest items to M region
+       s.heapify()
+       s.popMinToMRegion()
+       s.popMinToMRegion()
+
+       // The lighter of the two really belongs in R
+       s.m--
+       s.r++
+
+       // h should be k-1, m should be 1, r should be 1
+
+       // Update total weight in R (the item at position k)
+       s.totalWeightR = s.weights[s.k]
+       s.weights[s.k] = -1.0 // mark as R region item
+
+       // The two lightest items are a valid initial candidate set
+       // weights[k-1] is in M, weights[k] (now -1) represents R
+       s.growCandidateSet(s.weights[s.k-1]+s.totalWeightR, 2)
+}
+
+// updateLight handles a light item (weight <= old_tau) in estimation mode.
+func (s *VarOptItemsSketch[T]) updateLight(item T, weight float64) {
+       // The M slot is at index h (the gap)
+       mSlot := s.h
+       s.data[mSlot] = item
+       s.weights[mSlot] = weight
+       s.m++
+
+       s.growCandidateSet(s.totalWeightR+weight, s.r+1)
+}
+
+// updateHeavyGeneral handles a heavy item when r >= 2.
+func (s *VarOptItemsSketch[T]) updateHeavyGeneral(item T, weight float64) {
+       // Put into H (may come back out momentarily)
+       s.push(item, weight)
+
+       s.growCandidateSet(s.totalWeightR, s.r)
+}
+
+// updateHeavyREq1 handles a heavy item when r == 1.
+func (s *VarOptItemsSketch[T]) updateHeavyREq1(item T, weight float64) {
+       s.push(item, weight) // new item into H
+       s.popMinToMRegion()  // pop lightest back into M
+
+       // The M slot is at k-1 (array is k+1, 1 in R)
+       mSlot := s.k - 1
+       s.growCandidateSet(s.weights[mSlot]+s.totalWeightR, 2)
+}
+
+// push adds an item to the H region heap.
+func (s *VarOptItemsSketch[T]) push(item T, weight float64) {
+       // Insert at position h (the gap)
+       s.data[s.h] = item
+       s.weights[s.h] = weight
+       s.h++
+
+       s.siftUp(s.h - 1)
+}
+
+// popMinToMRegion moves the minimum item from H to M region.
+func (s *VarOptItemsSketch[T]) popMinToMRegion() {
+       if s.h == 0 {
+               return
+       }
+
+       if s.h == 1 {
+               s.m++
+               s.h--
+       } else {
+               tgt := s.h - 1
+               s.swap(0, tgt)
+               s.m++
+               s.h--
+               s.siftDown(0)
+       }
+}
+
+// growCandidateSet grows the candidate set by pulling light items from H to M.
+func (s *VarOptItemsSketch[T]) growCandidateSet(wtCands float64, numCands int) 
{
+       for s.h > 0 {
+               nextWt := s.peekMin()
+               nextTotWt := wtCands + nextWt
+
+               // test for strict lightness: nextWt * numCands < nextTotWt
+               if nextWt*float64(numCands) < nextTotWt {
+                       wtCands = nextTotWt
+                       numCands++
+                       s.popMinToMRegion()
+               } else {
+                       break
+               }
+       }
+
+       s.downsampleCandidateSet(wtCands, numCands)
+}
+
+// downsampleCandidateSet downsamples the candidate set to produce final R.
+func (s *VarOptItemsSketch[T]) downsampleCandidateSet(wtCands float64, 
numCands int) {
+       if numCands < 2 {
+               return
+       }
+
+       // Choose which slot to delete
+       deleteSlot := s.chooseDeleteSlot(wtCands, numCands)
+
+       leftmostCandSlot := s.h
+
+       // Mark weights for items moving from M to R as -1
+       stopIdx := leftmostCandSlot + s.m
+       for j := leftmostCandSlot; j < stopIdx; j++ {
+               s.weights[j] = -1.0
+       }
+
+       // Move the delete slot's content to leftmost candidate position
+       // This works even when deleteSlot == leftmostCandSlot
+       if deleteSlot != leftmostCandSlot {
+               s.data[deleteSlot] = s.data[leftmostCandSlot]
+       }
+
+       s.m = 0
+       s.r = numCands - 1
+       s.totalWeightR = wtCands
+}
+
+// chooseDeleteSlot randomly selects which item to delete from candidates.
+func (s *VarOptItemsSketch[T]) chooseDeleteSlot(wtCands float64, numCands int) 
int {
+       if s.r == 0 {
+               panic("choosing delete slot while in exact mode")

Review Comment:
   It should not panic. can you returning error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to