This is an automated email from the ASF dual-hosted git repository.

placave pushed a commit to branch cpc-sketch
in repository https://gitbox.apache.org/repos/asf/datasketches-go.git


The following commit(s) were added to refs/heads/cpc-sketch by this push:
     new 9de355f  CPCSketch Union init
9de355f is described below

commit 9de355f04fae7b805b436183cd8f7397a73d0f11
Author: Pierre Lacave <[email protected]>
AuthorDate: Fri Aug 9 05:14:44 2024 +0800

    CPCSketch Union init
---
 cpc/cpc_sketch.go      |  67 ++++++--------
 cpc/cpc_sketch_test.go |   6 ++
 cpc/cpc_union.go       | 230 +++++++++++++++++++++++++++++++++++++++++++++++++
 cpc/utils.go           |  57 ++++++++++++
 4 files changed, 317 insertions(+), 43 deletions(-)

diff --git a/cpc/cpc_sketch.go b/cpc/cpc_sketch.go
index 0f631cd..4b294d8 100644
--- a/cpc/cpc_sketch.go
+++ b/cpc/cpc_sketch.go
@@ -52,12 +52,12 @@ type CpcSketch struct {
        scratch [8]byte
 }
 
-func NewCpcSketch(lgK int, seed uint64) (*CpcSketch, error) {
+func NewCpcSketch(lgK int, seed uint64) (CpcSketch, error) {
        if err := checkLgK(lgK); err != nil {
-               return nil, err
+               return CpcSketch{}, err
        }
 
-       return &CpcSketch{
+       return CpcSketch{
                lgK:  lgK,
                seed: seed,
                kxp:  float64(int64(1) << lgK),
@@ -260,46 +260,6 @@ func (c *CpcSketch) promoteSparseToWindowed() {
        c.pairTable = newTable
 }
 
-/*
-  //In terms of flavor, this promotes SPARSE to HYBRID.
-  private static void promoteSparseToWindowed(final CpcSketch sketch) {
-    final int lgK = sketch.lgK;
-    final int k = (1 << lgK);
-    final long c32 = sketch.numCoupons << 5;
-    assert ((c32 == (3 * k)) || ((lgK == 4) && (c32 > (3 * k))));
-
-    final byte[] window = new byte[k];
-
-    final PairTable newTable = new PairTable(2, 6 + lgK);
-    final PairTable oldTable = sketch.pairTable;
-
-    final int[] oldSlots = oldTable.getSlotsArr();
-    final int oldNumSlots = (1 << oldTable.getLgSizeInts());
-
-    assert (sketch.windowOffset == 0);
-
-    for (int i = 0; i < oldNumSlots; i++) {
-      final int rowCol = oldSlots[i];
-      if (rowCol != -1) {
-        final int col = rowCol & 63;
-        if (col < 8) {
-          final int  row = rowCol >>> 6;
-          window[row] |= (1 << col);
-        }
-        else {
-          // cannot use Table.mustInsert(), because it doesn't provide for 
growth
-          final boolean isNovel = PairTable.maybeInsert(newTable, rowCol);
-          assert (isNovel == true);
-        }
-      }
-    }
-
-    assert (sketch.slidingWindow == null);
-    sketch.slidingWindow = window;
-    sketch.pairTable = newTable;
-  }
-*/
-
 func (c *CpcSketch) reset() {
        c.numCoupons = 0
        c.mergeFlag = false
@@ -310,3 +270,24 @@ func (c *CpcSketch) reset() {
        c.kxp = float64(int64(1) << c.lgK)
        c.hipEstAccum = 0
 }
+
+func (c *CpcSketch) rowColUpdate(rowCol int) error {
+       col := rowCol & 63
+       if col < c.fiCol {
+               return nil
+       }
+       if c.numCoupons == 0 {
+               err := c.promoteEmptyToSparse()
+               if err != nil {
+                       return err
+               }
+       }
+       k := uint64(1) << c.lgK
+       if (c.numCoupons << 5) < (3 * k) {
+               return c.updateSparse(rowCol)
+       } else {
+               // TODO(pierre)
+               // return c.updateWindowed(rowCol)
+       }
+       return nil
+}
diff --git a/cpc/cpc_sketch_test.go b/cpc/cpc_sketch_test.go
index cb0661f..791022b 100644
--- a/cpc/cpc_sketch_test.go
+++ b/cpc/cpc_sketch_test.go
@@ -67,6 +67,12 @@ func TestCPCCheckEstimatesWithMerge(t *testing.T) {
                err = sk2.UpdateUint64(uint64(i + n))
                assert.NoError(t, err)
        }
+       union, err := NewCpcUnionSketchWithDefault(lgk)
+       assert.NoError(t, err)
+       err = union.Update(sk1)
+       assert.NoError(t, err)
+       err = union.Update(sk2)
+       assert.NoError(t, err)
 }
 
 /*
diff --git a/cpc/cpc_union.go b/cpc/cpc_union.go
new file mode 100644
index 0000000..4575e78
--- /dev/null
+++ b/cpc/cpc_union.go
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cpc
+
+import (
+       "fmt"
+       "github.com/apache/datasketches-go/internal"
+)
+
+type CpcUnion struct {
+       seed uint64
+       lgK  int
+
+       // Note: at most one of bitMatrix and accumulator will be non-null at 
any given moment.
+       // accumulator is a sketch object that is employed until it graduates 
out of Sparse mode.
+       // At that point, it is converted into a full-sized bitMatrix, which is 
mathematically a sketch,
+       // but doesn't maintain any of the "extra" fields of our sketch 
objects, so some additional work
+       // is required when getResult is called at the end.
+       bitMatrix   []uint64
+       accumulator CpcSketch
+}
+
+func NewCpcUnionSketch(lgK int, seed uint64) (CpcUnion, error) {
+       acc, err := NewCpcSketch(lgK, internal.DEFAULT_UPDATE_SEED)
+       if err != nil {
+               return CpcUnion{}, err
+       }
+       return CpcUnion{
+               seed: seed,
+               lgK:  lgK,
+               // We begin with the accumulator holding an EMPTY_MERGED sketch 
object.
+               // As an optimization the accumulator could start as NULL, but 
that would require changes elsewhere.
+               accumulator: acc,
+       }, nil
+}
+
+func NewCpcUnionSketchWithDefault(lgK int) (CpcUnion, error) {
+       return NewCpcUnionSketch(lgK, internal.DEFAULT_UPDATE_SEED)
+}
+
+func (u *CpcUnion) Update(source CpcSketch) error {
+       if err := checkSeeds(u.seed, source.seed); err != nil {
+               return err
+       }
+
+       sourceFlavorOrd := source.GetFlavor()
+       if sourceFlavorOrd == CpcFlavorEmpty {
+               return nil
+       }
+
+       // Accumulator and bitMatrix must be mutually exclusive,
+       // so bitMatrix != nil => accumulator == nil and visa versa
+       // if (Accumulator != nil) union must be EMPTY or SPARSE,
+       if err := u.checkUnionState(); err != nil {
+               return err
+       }
+
+       if source.lgK < u.lgK {
+               if err := u.reduceUnionK(source.lgK); err != nil {
+                       return err
+               }
+       }
+
+       /*
+                   if (source == null) { return; }
+                 checkSeeds(union.seed, source.seed);
+
+                 final int sourceFlavorOrd = source.getFlavor().ordinal();
+                 if (sourceFlavorOrd == 0) { return; } //EMPTY
+
+                 //Accumulator and bitMatrix must be mutually exclusive,
+                 //so bitMatrix != null => accumulator == null and visa versa
+                 //if (Accumulator != null) union must be EMPTY or SPARSE,
+                 checkUnionState(union);
+
+                 if (source.lgK < union.lgK) { reduceUnionK(union, 
source.lgK); }
+
+                 // if source is past SPARSE mode, make sure that union is a 
bitMatrix.
+                 if ((sourceFlavorOrd > 1) && (union.accumulator != null)) {
+                   union.bitMatrix = 
CpcUtil.bitMatrixOfSketch(union.accumulator);
+                   union.accumulator = null;
+                 }
+
+                 final int state = ((sourceFlavorOrd - 1) << 1) | 
((union.bitMatrix != null) ? 1 : 0);
+                 switch (state) {
+                   case 0 : { //A: Sparse, bitMatrix == null, accumulator valid
+                     if (union.accumulator == null) {
+                       //CodeQL could not figure this out so I have to insert 
this.
+                       throw new SketchesStateException("union.accumulator can 
never be null here.");
+                     }
+                     if ((union.accumulator.getFlavor() == EMPTY)
+                         && (union.lgK == source.lgK)) {
+                       union.accumulator = source.copy();
+                       break;
+                     }
+                     walkTableUpdatingSketch(union.accumulator, 
source.pairTable);
+                     // if the accumulator has graduated beyond sparse, switch 
union to a bitMatrix
+                     if (union.accumulator.getFlavor().ordinal() > 1) {
+                       union.bitMatrix = 
CpcUtil.bitMatrixOfSketch(union.accumulator);
+                       union.accumulator = null;
+                     }
+                     break;
+                   }
+                   case 1 : { //B: Sparse, bitMatrix valid, accumulator == null
+                     orTableIntoMatrix(union.bitMatrix, union.lgK, 
source.pairTable);
+                     break;
+                   }
+                   case 3 :   //C: Hybrid, bitMatrix valid, accumulator == null
+                   case 5 : { //C: Pinned, bitMatrix valid, accumulator == null
+                     orWindowIntoMatrix(union.bitMatrix, union.lgK, 
source.slidingWindow,
+                         source.windowOffset, source.lgK);
+                     orTableIntoMatrix(union.bitMatrix, union.lgK, 
source.pairTable);
+                     break;
+                   }
+                   case 7 : { //D: Sliding, bitMatrix valid, accumulator == 
null
+                     // SLIDING mode involves inverted logic, so we can't just 
walk the source sketch.
+                     // Instead, we convert it to a bitMatrix that can be 
OR'ed into the destination.
+                     final long[] sourceMatrix = 
CpcUtil.bitMatrixOfSketch(source);
+                     orMatrixIntoMatrix(union.bitMatrix, union.lgK, 
sourceMatrix, source.lgK);
+                     break;
+                   }
+                   default: throw new SketchesStateException("Illegal Union 
state: " + state);
+                 }
+       */
+       return nil
+}
+
+func (u *CpcUnion) checkUnionState() error {
+       if u == nil {
+               return fmt.Errorf("union cannot be nil")
+       }
+
+       if u.accumulator.lgK != 0 && u.bitMatrix != nil {
+               return fmt.Errorf("accumulator and bitMatrix cannot be both 
valid or both nil")
+       }
+       if u.accumulator.lgK != 0 { // not nil
+               if u.accumulator.numCoupons > 0 {
+                       if u.accumulator.slidingWindow != nil || 
u.accumulator.pairTable == nil {
+                               return fmt.Errorf("Non-empty union accumulator 
must be SPARSE")
+                       }
+               }
+               if u.lgK != u.accumulator.lgK {
+                       return fmt.Errorf("union LgK must equal accumulator 
LgK")
+               }
+       }
+       return nil
+}
+
+func (u *CpcUnion) reduceUnionK(newLgK int) error {
+       if newLgK < u.lgK {
+               if u.bitMatrix != nil {
+                       // downsample the union's bit matrix
+                       newK := 1 << newLgK
+                       newMatrix := make([]uint64, newK)
+                       orMatrixIntoMatrix(newMatrix, newLgK, u.bitMatrix, 
u.lgK)
+                       u.bitMatrix = newMatrix
+                       u.lgK = newLgK
+               } else {
+                       // downsample the union's accumulator
+                       oldSketch := u.accumulator
+                       if oldSketch.numCoupons == 0 {
+                               acc, err := NewCpcSketch(newLgK, oldSketch.seed)
+                               if err != nil {
+                                       return err
+                               }
+                               u.accumulator = acc
+                               u.lgK = newLgK
+                               return nil
+                       }
+                       sk, err := NewCpcSketch(newLgK, oldSketch.seed)
+                       if err != nil {
+                               return err
+                       }
+                       newSketch := sk
+                       if err := walkTableUpdatingSketch(&newSketch, 
oldSketch.pairTable); err != nil {
+                               return err
+                       }
+                       finalNewFlavor := newSketch.GetFlavor()
+                       if finalNewFlavor == CpcFlavorSparse {
+                               u.accumulator = newSketch
+                               u.lgK = newLgK
+                               return nil
+                       }
+                       // the new sketch has graduated beyond sparse, so 
convert to bitMatrix
+                       //u.accumulator = nil
+                       u.bitMatrix = bitMatrixOfSketch(newSketch)
+                       u.lgK = newLgK
+               }
+       }
+       return nil
+}
+
+func walkTableUpdatingSketch(dest *CpcSketch, table *pairTable) error {
+       slots := table.slotsArr
+       numSlots := 1 << table.lgSizeInts
+       destMask := ((1<<dest.lgK)-1)<<6 | 63 // downsamples when dest.lgK < 
srcLgK
+
+       stride := int(internal.InverseGolden * float64(numSlots))
+       if stride == (stride >> 1 << 1) {
+               stride++
+       }
+
+       for i, j := 0, 0; i < numSlots; i, j = i+1, j+stride {
+               j &= numSlots - 1
+               rowCol := slots[j]
+               if rowCol != -1 {
+                       if err := dest.rowColUpdate(rowCol & destMask); err != 
nil {
+                               return err
+                       }
+               }
+
+       }
+
+       return nil
+}
diff --git a/cpc/utils.go b/cpc/utils.go
index 5897888..0759c5a 100644
--- a/cpc/utils.go
+++ b/cpc/utils.go
@@ -59,6 +59,13 @@ func checkLgSizeInts(lgSizeInts int) error {
        return nil
 }
 
+func checkSeeds(seedA uint64, seedB uint64) error {
+       if seedA != seedB {
+               return fmt.Errorf("Incompatible seeds: %d %d", seedA, seedB)
+       }
+       return nil
+}
+
 func determineFlavor(lgK int, numCoupons uint64) CpcFlavor {
        c := numCoupons
        k := uint64(1) << lgK
@@ -79,3 +86,53 @@ func determineFlavor(lgK int, numCoupons uint64) CpcFlavor {
        }
        return CpcFlavorSliding // 27K/8 <= C
 }
+
+func orMatrixIntoMatrix(destMatrix []uint64, destLgK int, srcMatrix []uint64, 
srcLgK int) {
+       //assert(destLgK <= srcLgK)
+       destMask := (1 << destLgK) - 1
+       srcK := 1 << srcLgK
+       for srcRow := 0; srcRow < srcK; srcRow++ {
+               destMatrix[srcRow&destMask] |= srcMatrix[srcRow]
+       }
+}
+
+func bitMatrixOfSketch(sketch CpcSketch) []uint64 {
+       k := uint64(1) << sketch.lgK
+       offset := sketch.windowOffset
+       if offset < 0 || offset > 56 {
+               panic("offset < 0 || offset > 56")
+       }
+       matrix := make([]uint64, k)
+       if sketch.numCoupons == 0 {
+               return matrix // Returning a matrix of zeros rather than NULL.
+       }
+       //Fill the matrix with default rows in which the "early zone" is filled 
with ones.
+       //This is essential for the routine's O(k) time cost (as opposed to 
O(C)).
+       defaultRow := (1 << offset) - 1
+       for i := range matrix {
+               matrix[i] = uint64(defaultRow)
+       }
+       if sketch.slidingWindow != nil { // In other words, we are in window 
mode, not sparse mode.
+               for i, v := range sketch.slidingWindow { // set the window 
bits, trusting the sketch's current offset.
+                       matrix[i] |= (uint64(v) << offset)
+               }
+       }
+       table := sketch.pairTable
+       if table == nil {
+               panic("table == nil")
+       }
+       slots := table.slotsArr
+       numSlots := 1 << table.lgSizeInts
+       for i := 0; i < numSlots; i++ {
+               rowCol := slots[i]
+               if rowCol != -1 {
+                       col := rowCol & 63
+                       row := rowCol >> 6
+                       // Flip the specified matrix bit from its default value.
+                       // In the "early" zone the bit changes from 1 to 0.
+                       // In the "late" zone the bit changes from 0 to 1.
+                       matrix[row] ^= (1 << col)
+               }
+       }
+       return matrix
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to