This is an automated email from the ASF dual-hosted git repository.
placave pushed a commit to branch cpc-sketch
in repository https://gitbox.apache.org/repos/asf/datasketches-go.git
The following commit(s) were added to refs/heads/cpc-sketch by this push:
new 7dc1bea CPC, start adding Update
7dc1bea is described below
commit 7dc1bea0aba3294a629f9a34a7b9cbda2c6a4a20
Author: Pierre Lacave <[email protected]>
AuthorDate: Sun Jun 23 15:16:56 2024 +0200
CPC, start adding Update
---
cpc/cpc_sketch.go | 101 ++++++++++++++++++++++++++++++++++++++++++++---
cpc/cpc_sketch_test.go | 2 +
cpc/pair_table.go | 104 +++++++++++++++++++++++++++++++++++++++++++++++++
cpc/utils.go | 15 +++++--
4 files changed, 213 insertions(+), 9 deletions(-)
diff --git a/cpc/cpc_sketch.go b/cpc/cpc_sketch.go
index 2b9b7ca..4e5685f 100644
--- a/cpc/cpc_sketch.go
+++ b/cpc/cpc_sketch.go
@@ -17,19 +17,26 @@
package cpc
+import (
+ "encoding/binary"
+ "fmt"
+ "github.com/twmb/murmur3"
+ "math/bits"
+)
+
const (
minLgK = 4
maxLgK = 26
)
type CpcSketch struct {
- seed int64
+ seed uint64
//common variables
lgK int
- numCoupons int64 // The number of coupons collected so far.
- mergeFlag bool // Is the sketch the result of merging?
- fiCol int // First Interesting Column. This is part of a speed
optimization.
+ numCoupons uint64 // The number of coupons collected so far.
+ mergeFlag bool // Is the sketch the result of merging?
+ fiCol int // First Interesting Column. This is part of a speed
optimization.
windowOffset int
slidingWindow []byte //either null or size K bytes
@@ -38,9 +45,11 @@ type CpcSketch struct {
//The following variables are only valid in HIP varients
kxp float64 //used with HIP
hipEstAccum float64 //used with HIP
+
+ scratch [8]byte
}
-func NewCpcSketch(lgK int, seed int64) (*CpcSketch, error) {
+func NewCpcSketch(lgK int, seed uint64) (*CpcSketch, error) {
if err := checkLgK(lgK); err != nil {
return nil, err
}
@@ -52,6 +61,88 @@ func NewCpcSketch(lgK int, seed int64) (*CpcSketch, error) {
}, nil
}
+func (c *CpcSketch) Update(datum int64) error {
+ binary.LittleEndian.PutUint64(c.scratch[:], uint64(datum))
+ hashLo, hashHi := hash(c.scratch[:], c.seed)
+ return c.hashUpdate(hashLo, hashHi)
+}
+
+func (c *CpcSketch) hashUpdate(hash0, hash1 uint64) error {
+ col := 64 - bits.LeadingZeros64(hash1)
+ if col < c.fiCol {
+ return nil // important speed optimization
+ }
+ if col > 63 {
+ col = 63 // clip so that 0 <= col <= 63
+ }
+ if c.numCoupons == 0 {
+ err := c.promoteEmptyToSparse()
+ if err != nil {
+ return err
+ }
+ }
+ k := uint64(1) << c.lgK
+ row := int(hash0 & (k - 1))
+ rowCol := (row << 6) | col
+
+ // Avoid the hash table's "empty" value which is (2^26 -1, 63) (all
ones) by changing it
+ // to the pair (2^26 - 2, 63), which effectively merges the two cells.
+ // This case is *extremely* unlikely, but we might as well handle it.
+ // It can't happen at all if lgK (or maxLgK) < 26.
+ if rowCol == -1 {
+ rowCol ^= 1 << 6 //set the LSB of row to 0
+ }
+
+ if (c.numCoupons << 5) < (uint64(3) * k) {
+ return c.updateSparse(rowCol)
+ } else {
+ // TODO(pierre)
+ // return c.updateWindowed(rowCol)
+ }
+ return nil
+}
+
+func (c *CpcSketch) promoteEmptyToSparse() error {
+ pairTable, err := NewPairTable(2, 6+c.lgK)
+ if err != nil {
+ return err
+ }
+ c.pairTable = pairTable
+ return nil
+}
+
+func (c *CpcSketch) updateSparse(rowCol int) error {
+ k := uint64(1) << c.lgK
+ c32pre := c.numCoupons << 5
+ if c32pre >= (3 * k) {
+ // C >= 3K/32, in other words, flavor == SPARSE
+ return fmt.Errorf("C >= 3K/32")
+ }
+ if c.pairTable == nil {
+ return fmt.Errorf("pairTable is nil")
+ }
+ isNovel, err := c.pairTable.maybeInsert(rowCol)
+ if err != nil {
+ return err
+ }
+ if isNovel {
+ c.numCoupons++
+ // TODO (pierre)
+ //c.updateHIP(rowCol)
+ c32post := c.numCoupons << 5
+ if c32post >= (3 * k) {
+ // TODO (pierre)
+ // c.promoteSparseToWindowed() // C >= 3K/32
+ }
+
+ }
+ return nil
+}
+
+func hash(bs []byte, seed uint64) (uint64, uint64) {
+ return murmur3.SeedSum128(seed, seed, bs)
+}
+
func (c *CpcSketch) getFormat() cpcFormat {
ordinal := 0
f := c.getFlavor()
diff --git a/cpc/cpc_sketch_test.go b/cpc/cpc_sketch_test.go
index 0049768..a5f7079 100644
--- a/cpc/cpc_sketch_test.go
+++ b/cpc/cpc_sketch_test.go
@@ -26,6 +26,8 @@ func TestCPCCheckUpdatesEstimate(t *testing.T) {
sk, err := NewCpcSketch(10, 0)
assert.NoError(t, err)
assert.Equal(t, sk.getFormat(), format_empty_hip)
+ err = sk.Update(1)
+ assert.NoError(t, err)
}
/*
diff --git a/cpc/pair_table.go b/cpc/pair_table.go
index b6ca032..eecf11b 100644
--- a/cpc/pair_table.go
+++ b/cpc/pair_table.go
@@ -17,9 +17,113 @@
package cpc
+import "fmt"
+
+const (
+ upsizeNumer = 3
+ upsizeDenom = 4
+ downsizeNumer = 1
+ downsizeDenom = 4
+)
+
type pairTable struct {
lgSizeInts int
validBits int
numPairs int
slotsArr []int
}
+
+func NewPairTable(lgSizeInts, numValidBits int) (*pairTable, error) {
+ if err := checkLgSizeInts(lgSizeInts); err != nil {
+ return nil, err
+ }
+ numSlots := 1 << lgSizeInts
+ validBits := numValidBits
+ numPairs := 0
+ slotsArr := make([]int, numSlots)
+ for i := range slotsArr {
+ slotsArr[i] = -1
+ }
+ return &pairTable{lgSizeInts, validBits, numPairs, slotsArr}, nil
+}
+
+func (p *pairTable) maybeInsert(item int) (bool, error) {
+ //SHARED CODE (implemented as a macro in C and expanded here)
+ lgSizeInts := p.lgSizeInts
+ sizeInts := 1 << lgSizeInts
+ mask := sizeInts - 1
+ shift := p.validBits - lgSizeInts
+ //rtAssert(shift > 0)
+ probe := item >> shift
+ //rtAssert((probe >= 0) && (probe <= mask))
+ arr := p.slotsArr
+ fetched := arr[probe]
+ for fetched != item && fetched != -1 {
+ probe = (probe + 1) & mask
+ fetched = arr[probe]
+ }
+ //END SHARED CODE
+ if fetched != item {
+ return false, nil
+ } else {
+ //assert (fetched == -1)
+ arr[probe] = item
+ p.numPairs++
+ for (upsizeDenom * p.numPairs) > (upsizeNumer * (1 <<
p.lgSizeInts)) {
+ if err := p.rebuild(p.lgSizeInts + 1); err != nil {
+ return false, err
+ }
+
+ }
+ return true, nil
+ }
+}
+
+func (p *pairTable) mustInsert(item int) {
+ //SHARED CODE (implemented as a macro in C and expanded here)
+ lgSizeInts := p.lgSizeInts
+ sizeInts := 1 << lgSizeInts
+ mask := sizeInts - 1
+ shift := p.validBits - lgSizeInts
+ //rtAssert(shift > 0)
+ probe := item >> shift
+ //rtAssert((probe >= 0) && (probe <= mask))
+ arr := p.slotsArr
+ fetched := arr[probe]
+ for fetched != item && fetched != -1 {
+ probe = (probe + 1) & mask
+ fetched = arr[probe]
+ }
+ //END SHARED CODE
+ if fetched == item {
+ panic("PairTable mustInsert() failed")
+ } else {
+ //assert (fetched == -1)
+ arr[probe] = item
+ // counts and resizing must be handled by the caller.
+ }
+}
+
+func (p *pairTable) rebuild(newLgSizeInts int) error {
+ if err := checkLgSizeInts(newLgSizeInts); err != nil {
+ return err
+ }
+ newSize := 1 << newLgSizeInts
+ oldSize := 1 << p.lgSizeInts
+ if newSize <= p.numPairs {
+ fmt.Errorf("newSize <= numPairs")
+ }
+ oldSlotsArr := p.slotsArr
+ p.slotsArr = make([]int, newSize)
+ for i := range p.slotsArr {
+ p.slotsArr[i] = -1
+ }
+ p.lgSizeInts = newLgSizeInts
+ for i := 0; i < oldSize; i++ {
+ item := oldSlotsArr[i]
+ if item != -1 {
+ p.mustInsert(item)
+ }
+ }
+ return nil
+}
diff --git a/cpc/utils.go b/cpc/utils.go
index f59c4b2..6660d60 100644
--- a/cpc/utils.go
+++ b/cpc/utils.go
@@ -48,22 +48,29 @@ func checkLgK(lgK int) error {
return nil
}
-func determineFlavor(lgK int, numCoupons int64) cpcFlavor {
+func checkLgSizeInts(lgSizeInts int) error {
+ if lgSizeInts < 2 || lgSizeInts > 26 {
+ return fmt.Errorf("Illegal LgSizeInts: %d", lgSizeInts)
+ }
+ return nil
+}
+
+func determineFlavor(lgK int, numCoupons uint64) cpcFlavor {
c := numCoupons
- k := int64(1) << lgK
+ k := uint64(1) << lgK
c2 := c << 1
c8 := c << 3
c32 := c << 5
if c == 0 {
return flavor_empty // 0 == C < 1
}
- if c32 < (int64(3) * k) {
+ if c32 < (3 * k) {
return flavor_sparse // 1 <= C < 3K/32
}
if c2 < k {
return flavor_hybrid // 3K/32 <= C < K/2
}
- if c8 < (int64(27) * k) {
+ if c8 < (27 * k) {
return flavor_pinned // K/2 <= C < 27K/8
}
return flavor_sliding // 27K/8 <= C
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]