proost commented on code in PR #102: URL: https://github.com/apache/datasketches-go/pull/102#discussion_r2700696096
########## sampling/varopt_items_sketch.go: ########## @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sampling + +import ( + "errors" + "iter" + "math" + "math/rand" +) + +// VarOptItemsSketch implements variance-optimal weighted sampling. +// +// This sketch samples weighted items from a stream with optimal variance +// for subset sum estimation. The algorithm maintains two regions: +// - H region (heavy): Items with weight >= tau (stored in a min-heap) +// - R region (reservoir): Items with weight < tau (sampled proportionally) +// +// The array layout is: [H region: 0..h) [gap/M region: h..h+m) [R region: h+m..h+m+r) +// In steady state, m=0 and h+r=k. The gap slot at position h is used during updates. +// +// When all weights are equal (e.g., 1.0), this reduces to standard reservoir sampling. +// +// Reference: Cohen et al., "Efficient Stream Sampling for Variance-Optimal +// Estimation of Subset Sums", SIAM J. Comput. 40(5): 1402-1431, 2011. +type VarOptItemsSketch[T any] struct { + k int // maximum sample size (user-configured) + n int64 // total number of items processed + h int // number of items in H (heavy/heap) region + m int // number of items in middle region (during candidate set operations) + r int // number of items in R (reservoir) region + totalWeightR float64 // total weight of items in R region + data []T // stored items + weights []float64 // corresponding weights for each item (-1.0 indicates R region) + + // resize factor for array growth + rf ResizeFactor + + // current allocated capacity + allocatedSize int +} + +const ( + // VarOpt specific constants + varOptDefaultResizeFactor = ResizeX8 + varOptMinLgK = 3 // minimum log2(k) = 3, so minimum k = 8 + varOptMinK = 1 << varOptMinLgK + varOptMaxK = (1 << 31) - 2 // maximum k value +) + +type VarOptOption func(*varOptConfig) + +type varOptConfig struct { + resizeFactor ResizeFactor +} + +func WithResizeFactor(rf ResizeFactor) VarOptOption { + return func(c *varOptConfig) { + c.resizeFactor = rf + } +} + +func NewVarOptItemsSketch[T any](k int, opts ...VarOptOption) (*VarOptItemsSketch[T], error) { + if k < varOptMinK { + return nil, errors.New("k must be at least 8") + } + if k > varOptMaxK { + return nil, errors.New("k is too large") + } + + cfg := &varOptConfig{ + resizeFactor: varOptDefaultResizeFactor, + } + + for _, opt := range opts { + opt(cfg) + } + + initialSize := int(cfg.resizeFactor) + if initialSize > k { + initialSize = k + } + + return &VarOptItemsSketch[T]{ + k: k, + n: 0, + h: 0, + m: 0, + r: 0, + totalWeightR: 0.0, + data: make([]T, 0, initialSize), + weights: make([]float64, 0, initialSize), + rf: cfg.resizeFactor, + allocatedSize: initialSize, + }, nil +} + +// K returns the configured maximum sample size. +func (s *VarOptItemsSketch[T]) K() int { return s.k } + +// N returns the total number of items processed by the sketch. +func (s *VarOptItemsSketch[T]) N() int64 { return s.n } + +// NumSamples returns the number of items currently retained in the sketch. +func (s *VarOptItemsSketch[T]) NumSamples() int { return s.h + s.r } + +// IsEmpty returns true if the sketch has not processed any items. +func (s *VarOptItemsSketch[T]) IsEmpty() bool { return s.n == 0 } + +// Reset clears the sketch to its initial empty state while preserving k. +func (s *VarOptItemsSketch[T]) Reset() { + s.n = 0 + s.h = 0 + s.m = 0 + s.r = 0 + s.totalWeightR = 0.0 + s.data = s.data[:0] + s.weights = s.weights[:0] +} + +// H returns the number of items in the H (heavy) region. +func (s *VarOptItemsSketch[T]) H() int { return s.h } + +// R returns the number of items in the R (reservoir) region. +func (s *VarOptItemsSketch[T]) R() int { return s.r } + +// TotalWeightR returns the total weight of items in the R region. +func (s *VarOptItemsSketch[T]) TotalWeightR() float64 { return s.totalWeightR } + +// Sample represents a weighted sample item. +type Sample[T any] struct { + Item T + Weight float64 +} + +// All returns an iterator over all samples with their adjusted weights. +// For items in H region, the weight is the original weight. +// For items in R region, the weight is tau (totalWeightR / r). +// +// Usage: +// +// for sample := range sketch.All() { +// fmt.Printf("Item: %v, Weight: %f\n", sample.Item, sample.Weight) +// } +func (s *VarOptItemsSketch[T]) All() iter.Seq[Sample[T]] { + return func(yield func(Sample[T]) bool) { + // H region: items with their actual weights + for i := 0; i < s.h; i++ { + if !yield(Sample[T]{Item: s.data[i], Weight: s.weights[i]}) { + return + } + } + + // R region: items with weight = tau + if s.r > 0 { + tau := s.totalWeightR / float64(s.r) + rStart := s.h + s.m + for i := 0; i < s.r; i++ { + if !yield(Sample[T]{Item: s.data[rStart+i], Weight: tau}) { + return + } + } + } + } +} + +// inWarmup returns true if the sketch is still in warmup phase (exact mode). +// During warmup, r=0 and we store all items directly in H. +func (s *VarOptItemsSketch[T]) inWarmup() bool { + return s.r == 0 +} + +// tau returns the current threshold value tau. +// Items with weight > tau are "heavy" and go into H region. +func (s *VarOptItemsSketch[T]) tau() float64 { + if s.r == 0 { + return math.NaN() + } + return s.totalWeightR / float64(s.r) +} + +// peekMin returns the minimum weight in the H region (heap root). +func (s *VarOptItemsSketch[T]) peekMin() float64 { + if s.h == 0 { + return math.Inf(1) + } + return s.weights[0] +} + +// Update adds an item with the given weight to the sketch. +// Weight must be positive and finite. +func (s *VarOptItemsSketch[T]) Update(item T, weight float64) error { + if weight < 0 || math.IsNaN(weight) || math.IsInf(weight, 0) { + return errors.New("weight must be nonnegative and finite") + } + if weight == 0 { + return nil // ignore zero weight items + } + + s.n++ + + if s.r == 0 { + // exact mode (warmup) + return s.updateWarmupPhase(item, weight) + } + // estimation mode + // what tau would be if deletion candidates = R + new item + hypotheticalTau := (weight + s.totalWeightR) / float64(s.r) // r+1-1 = r + + // is new item's turn to be considered for reservoir? + condition1 := s.h == 0 || weight <= s.peekMin() + // is new item light enough for reservoir? + condition2 := weight < hypotheticalTau + + if condition1 && condition2 { + return s.updateLight(item, weight) + } else if s.r == 1 { + return s.updateHeavyREq1(item, weight) + } + return s.updateHeavyGeneral(item, weight) +} + +// updateWarmupPhase handles the warmup phase when r=0. +func (s *VarOptItemsSketch[T]) updateWarmupPhase(item T, weight float64) error { + if s.h >= len(s.data) { + s.growDataArrays() + } + + // store items as they come in + if s.h < len(s.data) { Review Comment: is correct using `len`, not `cap` too ? ########## sampling/varopt_items_sketch_test.go: ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sampling + +import ( + "math" + "math/rand" + "testing" +) + +func TestVarOptItemsSketch_NewSketch(t *testing.T) { Review Comment: Can you add more test cases returning error in constructor? ########## sampling/varopt_items_sketch.go: ########## @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sampling + +import ( + "errors" + "iter" + "math" + "math/rand" +) + +// VarOptItemsSketch implements variance-optimal weighted sampling. +// +// This sketch samples weighted items from a stream with optimal variance +// for subset sum estimation. The algorithm maintains two regions: +// - H region (heavy): Items with weight >= tau (stored in a min-heap) +// - R region (reservoir): Items with weight < tau (sampled proportionally) +// +// The array layout is: [H region: 0..h) [gap/M region: h..h+m) [R region: h+m..h+m+r) +// In steady state, m=0 and h+r=k. The gap slot at position h is used during updates. +// +// When all weights are equal (e.g., 1.0), this reduces to standard reservoir sampling. +// +// Reference: Cohen et al., "Efficient Stream Sampling for Variance-Optimal +// Estimation of Subset Sums", SIAM J. Comput. 40(5): 1402-1431, 2011. +type VarOptItemsSketch[T any] struct { + k int // maximum sample size (user-configured) + n int64 // total number of items processed + h int // number of items in H (heavy/heap) region + m int // number of items in middle region (during candidate set operations) + r int // number of items in R (reservoir) region + totalWeightR float64 // total weight of items in R region + data []T // stored items + weights []float64 // corresponding weights for each item (-1.0 indicates R region) + + // resize factor for array growth + rf ResizeFactor + + // current allocated capacity + allocatedSize int +} + +const ( + // VarOpt specific constants + varOptDefaultResizeFactor = ResizeX8 + varOptMinLgK = 3 // minimum log2(k) = 3, so minimum k = 8 + varOptMinK = 1 << varOptMinLgK + varOptMaxK = (1 << 31) - 2 // maximum k value +) + +type VarOptOption func(*varOptConfig) + +type varOptConfig struct { + resizeFactor ResizeFactor +} + +func WithResizeFactor(rf ResizeFactor) VarOptOption { + return func(c *varOptConfig) { + c.resizeFactor = rf + } +} + +func NewVarOptItemsSketch[T any](k int, opts ...VarOptOption) (*VarOptItemsSketch[T], error) { + if k < varOptMinK { + return nil, errors.New("k must be at least 8") + } + if k > varOptMaxK { + return nil, errors.New("k is too large") + } + + cfg := &varOptConfig{ + resizeFactor: varOptDefaultResizeFactor, + } + + for _, opt := range opts { + opt(cfg) + } + + initialSize := int(cfg.resizeFactor) + if initialSize > k { + initialSize = k + } + + return &VarOptItemsSketch[T]{ + k: k, + n: 0, + h: 0, + m: 0, + r: 0, + totalWeightR: 0.0, + data: make([]T, 0, initialSize), + weights: make([]float64, 0, initialSize), + rf: cfg.resizeFactor, + allocatedSize: initialSize, + }, nil +} + +// K returns the configured maximum sample size. +func (s *VarOptItemsSketch[T]) K() int { return s.k } + +// N returns the total number of items processed by the sketch. +func (s *VarOptItemsSketch[T]) N() int64 { return s.n } + +// NumSamples returns the number of items currently retained in the sketch. +func (s *VarOptItemsSketch[T]) NumSamples() int { return s.h + s.r } + +// IsEmpty returns true if the sketch has not processed any items. +func (s *VarOptItemsSketch[T]) IsEmpty() bool { return s.n == 0 } + +// Reset clears the sketch to its initial empty state while preserving k. +func (s *VarOptItemsSketch[T]) Reset() { + s.n = 0 + s.h = 0 + s.m = 0 + s.r = 0 + s.totalWeightR = 0.0 + s.data = s.data[:0] + s.weights = s.weights[:0] +} + +// H returns the number of items in the H (heavy) region. +func (s *VarOptItemsSketch[T]) H() int { return s.h } + +// R returns the number of items in the R (reservoir) region. +func (s *VarOptItemsSketch[T]) R() int { return s.r } + +// TotalWeightR returns the total weight of items in the R region. +func (s *VarOptItemsSketch[T]) TotalWeightR() float64 { return s.totalWeightR } + +// Sample represents a weighted sample item. +type Sample[T any] struct { + Item T + Weight float64 +} + +// All returns an iterator over all samples with their adjusted weights. +// For items in H region, the weight is the original weight. +// For items in R region, the weight is tau (totalWeightR / r). +// +// Usage: Review Comment: I think `Usage` comment is unnecessary. We can assume that our user is familiar with iterator pattern. ########## sampling/varopt_items_sketch.go: ########## @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sampling + +import ( + "errors" + "iter" + "math" + "math/rand" +) + +// VarOptItemsSketch implements variance-optimal weighted sampling. +// +// This sketch samples weighted items from a stream with optimal variance +// for subset sum estimation. The algorithm maintains two regions: +// - H region (heavy): Items with weight >= tau (stored in a min-heap) +// - R region (reservoir): Items with weight < tau (sampled proportionally) +// +// The array layout is: [H region: 0..h) [gap/M region: h..h+m) [R region: h+m..h+m+r) +// In steady state, m=0 and h+r=k. The gap slot at position h is used during updates. +// +// When all weights are equal (e.g., 1.0), this reduces to standard reservoir sampling. +// +// Reference: Cohen et al., "Efficient Stream Sampling for Variance-Optimal +// Estimation of Subset Sums", SIAM J. Comput. 40(5): 1402-1431, 2011. +type VarOptItemsSketch[T any] struct { + k int // maximum sample size (user-configured) + n int64 // total number of items processed + h int // number of items in H (heavy/heap) region + m int // number of items in middle region (during candidate set operations) + r int // number of items in R (reservoir) region + totalWeightR float64 // total weight of items in R region + data []T // stored items + weights []float64 // corresponding weights for each item (-1.0 indicates R region) + + // resize factor for array growth + rf ResizeFactor + + // current allocated capacity + allocatedSize int +} + +const ( + // VarOpt specific constants + varOptDefaultResizeFactor = ResizeX8 + varOptMinLgK = 3 // minimum log2(k) = 3, so minimum k = 8 + varOptMinK = 1 << varOptMinLgK + varOptMaxK = (1 << 31) - 2 // maximum k value +) + +type VarOptOption func(*varOptConfig) + +type varOptConfig struct { + resizeFactor ResizeFactor +} + +func WithResizeFactor(rf ResizeFactor) VarOptOption { + return func(c *varOptConfig) { + c.resizeFactor = rf + } +} + +func NewVarOptItemsSketch[T any](k int, opts ...VarOptOption) (*VarOptItemsSketch[T], error) { + if k < varOptMinK { + return nil, errors.New("k must be at least 8") + } + if k > varOptMaxK { + return nil, errors.New("k is too large") + } + + cfg := &varOptConfig{ + resizeFactor: varOptDefaultResizeFactor, + } + + for _, opt := range opts { + opt(cfg) + } + + initialSize := int(cfg.resizeFactor) + if initialSize > k { + initialSize = k + } + + return &VarOptItemsSketch[T]{ + k: k, + n: 0, + h: 0, + m: 0, + r: 0, + totalWeightR: 0.0, + data: make([]T, 0, initialSize), + weights: make([]float64, 0, initialSize), + rf: cfg.resizeFactor, + allocatedSize: initialSize, + }, nil +} + +// K returns the configured maximum sample size. +func (s *VarOptItemsSketch[T]) K() int { return s.k } + +// N returns the total number of items processed by the sketch. +func (s *VarOptItemsSketch[T]) N() int64 { return s.n } + +// NumSamples returns the number of items currently retained in the sketch. +func (s *VarOptItemsSketch[T]) NumSamples() int { return s.h + s.r } + +// IsEmpty returns true if the sketch has not processed any items. +func (s *VarOptItemsSketch[T]) IsEmpty() bool { return s.n == 0 } + +// Reset clears the sketch to its initial empty state while preserving k. +func (s *VarOptItemsSketch[T]) Reset() { + s.n = 0 + s.h = 0 + s.m = 0 + s.r = 0 + s.totalWeightR = 0.0 + s.data = s.data[:0] + s.weights = s.weights[:0] +} + +// H returns the number of items in the H (heavy) region. +func (s *VarOptItemsSketch[T]) H() int { return s.h } + +// R returns the number of items in the R (reservoir) region. +func (s *VarOptItemsSketch[T]) R() int { return s.r } + +// TotalWeightR returns the total weight of items in the R region. +func (s *VarOptItemsSketch[T]) TotalWeightR() float64 { return s.totalWeightR } + +// Sample represents a weighted sample item. +type Sample[T any] struct { + Item T + Weight float64 +} + +// All returns an iterator over all samples with their adjusted weights. +// For items in H region, the weight is the original weight. +// For items in R region, the weight is tau (totalWeightR / r). +// +// Usage: +// +// for sample := range sketch.All() { +// fmt.Printf("Item: %v, Weight: %f\n", sample.Item, sample.Weight) +// } +func (s *VarOptItemsSketch[T]) All() iter.Seq[Sample[T]] { + return func(yield func(Sample[T]) bool) { + // H region: items with their actual weights + for i := 0; i < s.h; i++ { + if !yield(Sample[T]{Item: s.data[i], Weight: s.weights[i]}) { + return + } + } + + // R region: items with weight = tau + if s.r > 0 { + tau := s.totalWeightR / float64(s.r) + rStart := s.h + s.m + for i := 0; i < s.r; i++ { + if !yield(Sample[T]{Item: s.data[rStart+i], Weight: tau}) { + return + } + } + } + } +} + +// inWarmup returns true if the sketch is still in warmup phase (exact mode). +// During warmup, r=0 and we store all items directly in H. +func (s *VarOptItemsSketch[T]) inWarmup() bool { + return s.r == 0 +} + +// tau returns the current threshold value tau. +// Items with weight > tau are "heavy" and go into H region. +func (s *VarOptItemsSketch[T]) tau() float64 { + if s.r == 0 { + return math.NaN() + } + return s.totalWeightR / float64(s.r) +} + +// peekMin returns the minimum weight in the H region (heap root). +func (s *VarOptItemsSketch[T]) peekMin() float64 { + if s.h == 0 { + return math.Inf(1) + } + return s.weights[0] +} + +// Update adds an item with the given weight to the sketch. +// Weight must be positive and finite. +func (s *VarOptItemsSketch[T]) Update(item T, weight float64) error { + if weight < 0 || math.IsNaN(weight) || math.IsInf(weight, 0) { + return errors.New("weight must be nonnegative and finite") + } + if weight == 0 { + return nil // ignore zero weight items + } + + s.n++ + + if s.r == 0 { + // exact mode (warmup) + return s.updateWarmupPhase(item, weight) + } + // estimation mode + // what tau would be if deletion candidates = R + new item + hypotheticalTau := (weight + s.totalWeightR) / float64(s.r) // r+1-1 = r + + // is new item's turn to be considered for reservoir? + condition1 := s.h == 0 || weight <= s.peekMin() + // is new item light enough for reservoir? + condition2 := weight < hypotheticalTau + + if condition1 && condition2 { + return s.updateLight(item, weight) + } else if s.r == 1 { + return s.updateHeavyREq1(item, weight) + } + return s.updateHeavyGeneral(item, weight) +} + +// updateWarmupPhase handles the warmup phase when r=0. +func (s *VarOptItemsSketch[T]) updateWarmupPhase(item T, weight float64) error { + if s.h >= len(s.data) { Review Comment: is correct using `len`, not `cap`? when exact mode always growing called, not when it needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
