This is an automated email from the ASF dual-hosted git repository.

placave pushed a commit to branch cpc-sketch
in repository https://gitbox.apache.org/repos/asf/datasketches-go.git


The following commit(s) were added to refs/heads/cpc-sketch by this push:
     new 7dc1bea  CPC, start adding Update
7dc1bea is described below

commit 7dc1bea0aba3294a629f9a34a7b9cbda2c6a4a20
Author: Pierre Lacave <[email protected]>
AuthorDate: Sun Jun 23 15:16:56 2024 +0200

    CPC, start adding Update
---
 cpc/cpc_sketch.go      | 101 ++++++++++++++++++++++++++++++++++++++++++++---
 cpc/cpc_sketch_test.go |   2 +
 cpc/pair_table.go      | 104 +++++++++++++++++++++++++++++++++++++++++++++++++
 cpc/utils.go           |  15 +++++--
 4 files changed, 213 insertions(+), 9 deletions(-)

diff --git a/cpc/cpc_sketch.go b/cpc/cpc_sketch.go
index 2b9b7ca..4e5685f 100644
--- a/cpc/cpc_sketch.go
+++ b/cpc/cpc_sketch.go
@@ -17,19 +17,26 @@
 
 package cpc
 
+import (
+       "encoding/binary"
+       "fmt"
+       "github.com/twmb/murmur3"
+       "math/bits"
+)
+
 const (
        minLgK = 4
        maxLgK = 26
 )
 
 type CpcSketch struct {
-       seed int64
+       seed uint64
 
        //common variables
        lgK        int
-       numCoupons int64 // The number of coupons collected so far.
-       mergeFlag  bool  // Is the sketch the result of merging?
-       fiCol      int   // First Interesting Column. This is part of a speed 
optimization.
+       numCoupons uint64 // The number of coupons collected so far.
+       mergeFlag  bool   // Is the sketch the result of merging?
+       fiCol      int    // First Interesting Column. This is part of a speed 
optimization.
 
        windowOffset  int
        slidingWindow []byte     //either null or size K bytes
@@ -38,9 +45,11 @@ type CpcSketch struct {
        //The following variables are only valid in HIP varients
        kxp         float64 //used with HIP
        hipEstAccum float64 //used with HIP
+
+       scratch [8]byte
 }
 
-func NewCpcSketch(lgK int, seed int64) (*CpcSketch, error) {
+func NewCpcSketch(lgK int, seed uint64) (*CpcSketch, error) {
        if err := checkLgK(lgK); err != nil {
                return nil, err
        }
@@ -52,6 +61,88 @@ func NewCpcSketch(lgK int, seed int64) (*CpcSketch, error) {
        }, nil
 }
 
+func (c *CpcSketch) Update(datum int64) error {
+       binary.LittleEndian.PutUint64(c.scratch[:], uint64(datum))
+       hashLo, hashHi := hash(c.scratch[:], c.seed)
+       return c.hashUpdate(hashLo, hashHi)
+}
+
+func (c *CpcSketch) hashUpdate(hash0, hash1 uint64) error {
+       col := 64 - bits.LeadingZeros64(hash1)
+       if col < c.fiCol {
+               return nil // important speed optimization
+       }
+       if col > 63 {
+               col = 63 // clip so that 0 <= col <= 63
+       }
+       if c.numCoupons == 0 {
+               err := c.promoteEmptyToSparse()
+               if err != nil {
+                       return err
+               }
+       }
+       k := uint64(1) << c.lgK
+       row := int(hash0 & (k - 1))
+       rowCol := (row << 6) | col
+
+       // Avoid the hash table's "empty" value which is (2^26 -1, 63) (all 
ones) by changing it
+       // to the pair (2^26 - 2, 63), which effectively merges the two cells.
+       // This case is *extremely* unlikely, but we might as well handle it.
+       // It can't happen at all if lgK (or maxLgK) < 26.
+       if rowCol == -1 {
+               rowCol ^= 1 << 6 //set the LSB of row to 0
+       }
+
+       if (c.numCoupons << 5) < (uint64(3) * k) {
+               return c.updateSparse(rowCol)
+       } else {
+               // TODO(pierre)
+               // return c.updateWindowed(rowCol)
+       }
+       return nil
+}
+
+func (c *CpcSketch) promoteEmptyToSparse() error {
+       pairTable, err := NewPairTable(2, 6+c.lgK)
+       if err != nil {
+               return err
+       }
+       c.pairTable = pairTable
+       return nil
+}
+
+func (c *CpcSketch) updateSparse(rowCol int) error {
+       k := uint64(1) << c.lgK
+       c32pre := c.numCoupons << 5
+       if c32pre >= (3 * k) {
+               // C >= 3K/32, in other words, flavor == SPARSE
+               return fmt.Errorf("C >= 3K/32")
+       }
+       if c.pairTable == nil {
+               return fmt.Errorf("pairTable is nil")
+       }
+       isNovel, err := c.pairTable.maybeInsert(rowCol)
+       if err != nil {
+               return err
+       }
+       if isNovel {
+               c.numCoupons++
+               // TODO (pierre)
+               //c.updateHIP(rowCol)
+               c32post := c.numCoupons << 5
+               if c32post >= (3 * k) {
+                       // TODO (pierre)
+                       // c.promoteSparseToWindowed() // C >= 3K/32
+               }
+
+       }
+       return nil
+}
+
+func hash(bs []byte, seed uint64) (uint64, uint64) {
+       return murmur3.SeedSum128(seed, seed, bs)
+}
+
 func (c *CpcSketch) getFormat() cpcFormat {
        ordinal := 0
        f := c.getFlavor()
diff --git a/cpc/cpc_sketch_test.go b/cpc/cpc_sketch_test.go
index 0049768..a5f7079 100644
--- a/cpc/cpc_sketch_test.go
+++ b/cpc/cpc_sketch_test.go
@@ -26,6 +26,8 @@ func TestCPCCheckUpdatesEstimate(t *testing.T) {
        sk, err := NewCpcSketch(10, 0)
        assert.NoError(t, err)
        assert.Equal(t, sk.getFormat(), format_empty_hip)
+       err = sk.Update(1)
+       assert.NoError(t, err)
 }
 
 /*
diff --git a/cpc/pair_table.go b/cpc/pair_table.go
index b6ca032..eecf11b 100644
--- a/cpc/pair_table.go
+++ b/cpc/pair_table.go
@@ -17,9 +17,113 @@
 
 package cpc
 
+import "fmt"
+
+const (
+       upsizeNumer   = 3
+       upsizeDenom   = 4
+       downsizeNumer = 1
+       downsizeDenom = 4
+)
+
 type pairTable struct {
        lgSizeInts int
        validBits  int
        numPairs   int
        slotsArr   []int
 }
+
+func NewPairTable(lgSizeInts, numValidBits int) (*pairTable, error) {
+       if err := checkLgSizeInts(lgSizeInts); err != nil {
+               return nil, err
+       }
+       numSlots := 1 << lgSizeInts
+       validBits := numValidBits
+       numPairs := 0
+       slotsArr := make([]int, numSlots)
+       for i := range slotsArr {
+               slotsArr[i] = -1
+       }
+       return &pairTable{lgSizeInts, validBits, numPairs, slotsArr}, nil
+}
+
+func (p *pairTable) maybeInsert(item int) (bool, error) {
+       //SHARED CODE (implemented as a macro in C and expanded here)
+       lgSizeInts := p.lgSizeInts
+       sizeInts := 1 << lgSizeInts
+       mask := sizeInts - 1
+       shift := p.validBits - lgSizeInts
+       //rtAssert(shift > 0)
+       probe := item >> shift
+       //rtAssert((probe >= 0) && (probe <= mask))
+       arr := p.slotsArr
+       fetched := arr[probe]
+       for fetched != item && fetched != -1 {
+               probe = (probe + 1) & mask
+               fetched = arr[probe]
+       }
+       //END SHARED CODE
+       if fetched != item {
+               return false, nil
+       } else {
+               //assert (fetched == -1)
+               arr[probe] = item
+               p.numPairs++
+               for (upsizeDenom * p.numPairs) > (upsizeNumer * (1 << 
p.lgSizeInts)) {
+                       if err := p.rebuild(p.lgSizeInts + 1); err != nil {
+                               return false, err
+                       }
+
+               }
+               return true, nil
+       }
+}
+
+func (p *pairTable) mustInsert(item int) {
+       //SHARED CODE (implemented as a macro in C and expanded here)
+       lgSizeInts := p.lgSizeInts
+       sizeInts := 1 << lgSizeInts
+       mask := sizeInts - 1
+       shift := p.validBits - lgSizeInts
+       //rtAssert(shift > 0)
+       probe := item >> shift
+       //rtAssert((probe >= 0) && (probe <= mask))
+       arr := p.slotsArr
+       fetched := arr[probe]
+       for fetched != item && fetched != -1 {
+               probe = (probe + 1) & mask
+               fetched = arr[probe]
+       }
+       //END SHARED CODE
+       if fetched == item {
+               panic("PairTable mustInsert() failed")
+       } else {
+               //assert (fetched == -1)
+               arr[probe] = item
+               // counts and resizing must be handled by the caller.
+       }
+}
+
+func (p *pairTable) rebuild(newLgSizeInts int) error {
+       if err := checkLgSizeInts(newLgSizeInts); err != nil {
+               return err
+       }
+       newSize := 1 << newLgSizeInts
+       oldSize := 1 << p.lgSizeInts
+       if newSize <= p.numPairs {
+               fmt.Errorf("newSize <= numPairs")
+       }
+       oldSlotsArr := p.slotsArr
+       p.slotsArr = make([]int, newSize)
+       for i := range p.slotsArr {
+               p.slotsArr[i] = -1
+       }
+       p.lgSizeInts = newLgSizeInts
+       for i := 0; i < oldSize; i++ {
+               item := oldSlotsArr[i]
+               if item != -1 {
+                       p.mustInsert(item)
+               }
+       }
+       return nil
+}
diff --git a/cpc/utils.go b/cpc/utils.go
index f59c4b2..6660d60 100644
--- a/cpc/utils.go
+++ b/cpc/utils.go
@@ -48,22 +48,29 @@ func checkLgK(lgK int) error {
        return nil
 }
 
-func determineFlavor(lgK int, numCoupons int64) cpcFlavor {
+func checkLgSizeInts(lgSizeInts int) error {
+       if lgSizeInts < 2 || lgSizeInts > 26 {
+               return fmt.Errorf("Illegal LgSizeInts: %d", lgSizeInts)
+       }
+       return nil
+}
+
+func determineFlavor(lgK int, numCoupons uint64) cpcFlavor {
        c := numCoupons
-       k := int64(1) << lgK
+       k := uint64(1) << lgK
        c2 := c << 1
        c8 := c << 3
        c32 := c << 5
        if c == 0 {
                return flavor_empty //    0  == C <    1
        }
-       if c32 < (int64(3) * k) {
+       if c32 < (3 * k) {
                return flavor_sparse //    1  <= C <   3K/32
        }
        if c2 < k {
                return flavor_hybrid // 3K/32 <= C <   K/2
        }
-       if c8 < (int64(27) * k) {
+       if c8 < (27 * k) {
                return flavor_pinned //   K/2 <= C < 27K/8
        }
        return flavor_sliding // 27K/8 <= C


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to