This is an automated email from the ASF dual-hosted git repository.

placave pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/datasketches-characterization.git


The following commit(s) were added to refs/heads/master by this push:
     new 11e6743  [Go] Add FrequencyLong speed profile
11e6743 is described below

commit 11e67434c79bddc721542e58bebf05aa8d149e21
Author: Pierre Lacave <[email protected]>
AuthorDate: Fri May 17 19:20:39 2024 +0200

    [Go] Add FrequencyLong speed profile
---
 go/frequency_long_speed_profile.go      | 164 ++++++++++++++++++++++++++++++++
 go/{main_test.go => frequency_utils.go} |  23 ++---
 go/main.go                              |  14 +++
 go/main_test.go                         |   4 +
 go/zipf_distribution.go                 | 100 +++++++++++++++++++
 5 files changed, 291 insertions(+), 14 deletions(-)

diff --git a/go/frequency_long_speed_profile.go 
b/go/frequency_long_speed_profile.go
new file mode 100644
index 0000000..dea88fb
--- /dev/null
+++ b/go/frequency_long_speed_profile.go
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+       "fmt"
+       "github.com/apache/datasketches-go/frequencies"
+       "math"
+       "runtime"
+       "runtime/debug"
+       "strings"
+       "time"
+)
+
+type FrequencyLongSpeedProfile struct {
+       config      frequencyJobConfigType
+       zipf        *zipfDistribution
+       startTime   int64
+       stats       frequencySpeedStats
+       inputValues []int64
+}
+
+type frequencySpeedStats struct {
+       buildTimeNs         int64
+       updateTimeNs        int64
+       serializeTimeNs     int64
+       deserializeTimeNs   int64
+       numRetainedItems    int64
+       serializedSizeBytes int64
+}
+
+func NewFrequencyLongSpeedProfile(config frequencyJobConfigType) 
*FrequencyLongSpeedProfile {
+       return &FrequencyLongSpeedProfile{
+               config:    config,
+               zipf:      newZipfDistribution(int64(config.zipfRange), 
config.zipfExponent),
+               startTime: time.Now().UnixMilli(),
+               stats:     frequencySpeedStats{},
+       }
+}
+
+func (d *FrequencyLongSpeedProfile) run() {
+       sb := &strings.Builder{}
+       debug.SetGCPercent(-1)
+       debug.SetMemoryLimit(math.MaxInt64)
+
+       lgMinStreamLen := d.config.lgMin
+       lgMaxStreamLen := d.config.lgMax
+       pointsPerOctave := d.config.PPO
+       lgMinTrials := d.config.lgMinTrials
+       lgMaxTrials := d.config.lgMaxTrials
+
+       minStreamLen := 1 << lgMinStreamLen
+       maxStreamLen := 1 << lgMaxStreamLen
+
+       d.setHeader(sb)
+       fmt.Println(sb.String())
+
+       streamLength := minStreamLen
+       for streamLength <= maxStreamLen {
+               sb.Reset()
+               numTrials := getNumTrials(streamLength, lgMinStreamLen, 
lgMaxStreamLen, lgMinTrials, lgMaxTrials)
+               d.resetStats()
+               for i := 0; i < numTrials; i++ {
+                       d.prepareTrial(streamLength)
+                       d.process()
+               }
+               runtime.GC()
+               d.getStats(streamLength, numTrials, sb)
+               fmt.Println(sb.String())
+               streamLength = int(pwr2SeriesNext(pointsPerOctave, 
uint64(streamLength)))
+       }
+}
+
+func (d *FrequencyLongSpeedProfile) process() {
+       startBuild := time.Now().UnixNano()
+       sketch, err := frequencies.NewLongsSketchWithMaxMapSize(d.config.k)
+       if err != nil {
+               panic(err)
+       }
+       stopBuild := time.Now().UnixNano()
+       d.stats.buildTimeNs += stopBuild - startBuild
+
+       startUpdate := time.Now().UnixNano()
+       for i := 0; i < len(d.inputValues); i++ {
+               err := sketch.Update(d.inputValues[i])
+               if err != nil {
+                       panic(err)
+               }
+       }
+       stopUpdate := time.Now().UnixNano()
+       d.stats.updateTimeNs += stopUpdate - startUpdate
+
+       startSerialize := time.Now().UnixNano()
+       bytes := sketch.ToSlice()
+       stopSerialize := time.Now().UnixNano()
+       d.stats.serializeTimeNs += stopSerialize - startSerialize
+
+       startDeserialize := time.Now().UnixNano()
+       _, err = frequencies.NewLongsSketchFromSlice(bytes)
+       if err != nil {
+               panic(err)
+       }
+       stopDeserialize := time.Now().UnixNano()
+       d.stats.deserializeTimeNs += stopDeserialize - startDeserialize
+
+       d.stats.numRetainedItems += int64(sketch.GetNumActiveItems())
+       d.stats.serializedSizeBytes += int64(len(bytes))
+
+}
+
+func (d *FrequencyLongSpeedProfile) setHeader(sb *strings.Builder) {
+       
sb.WriteString("Stream\tTrials\tBuild\tUpdate\tSer\tDeser\tItems\tstatsSize")
+}
+
+func (d *FrequencyLongSpeedProfile) getStats(streamLength int, numTrials int, 
sb *strings.Builder) {
+       sb.WriteString(fmt.Sprintf("%d\t%d\t%.1f\t%.1f\t%.1f\t%.1f\t%.1f\t%.1f",
+               streamLength,
+               numTrials,
+               float64(d.stats.buildTimeNs)/float64(numTrials),
+               
float64(d.stats.updateTimeNs)/float64(numTrials)/float64(streamLength),
+               float64(d.stats.serializeTimeNs)/float64(numTrials),
+               float64(d.stats.deserializeTimeNs)/float64(numTrials),
+               float64(d.stats.numRetainedItems)/float64(numTrials),
+               float64(d.stats.serializedSizeBytes)/float64(numTrials),
+       ))
+}
+
+func (d *FrequencyLongSpeedProfile) resetStats() {
+       d.stats.buildTimeNs = 0
+       d.stats.updateTimeNs = 0
+       d.stats.serializeTimeNs = 0
+       d.stats.deserializeTimeNs = 0
+       d.stats.numRetainedItems = 0
+       d.stats.serializedSizeBytes = 0
+}
+
+func (d *FrequencyLongSpeedProfile) prepareTrial(streamLength int) {
+       // prepare input data
+       d.inputValues = make([]int64, streamLength)
+       for i := 0; i < streamLength; i++ {
+               d.inputValues[i] = d.zipf.sample()
+       }
+}
+
+func getNumTrials(x, lgMinX, lgMaxX, lgMinTrials, lgMaxTrials int) int {
+       slope := float64(lgMaxTrials-lgMinTrials) / float64(lgMinX-lgMaxX)
+       lgX := math.Log(float64(x)) / math.Log(2.0)
+       lgTrials := slope*lgX + float64(lgMaxTrials)
+       return int(math.Pow(2.0, lgTrials))
+}
diff --git a/go/main_test.go b/go/frequency_utils.go
similarity index 66%
copy from go/main_test.go
copy to go/frequency_utils.go
index 7d8034f..0664f9f 100644
--- a/go/main_test.go
+++ b/go/frequency_utils.go
@@ -17,22 +17,17 @@
 
 package main
 
-import (
-       "testing"
-)
+type frequencyJobConfigType struct {
+       k int // sketch size and accuracy parameter
 
-func TestHllSketchAccuracyRunner(t *testing.T) {
-       jobs["distinct_count_accuracy_profile"].run()
-}
+       lgMin int // The starting stream length
+       lgMax int // How high the stream length goes
 
-func TestHllSketchMergeAccuracyRunner(t *testing.T) {
-       jobs["distinct_count_merge_accuracy_profile"].run()
-}
+       PPO int // The horizontal x-resolution of trials points
 
-func TestHllSketchMergeSpeedRunner(t *testing.T) {
-       jobs["distinct_count_merge_speed_profile"].run()
-}
+       lgMinTrials int // Min trials at tail (high counts)
+       lgMaxTrials int // Max trials at start (low counts)
 
-func TestHllSketchSerdeRunner(t *testing.T) {
-       jobs["distinct_count_serde_profile"].run()
+       zipfRange    int
+       zipfExponent float64
 }
diff --git a/go/main.go b/go/main.go
index c60399b..51eef42 100644
--- a/go/main.go
+++ b/go/main.go
@@ -79,6 +79,20 @@ var (
                        },
                        hll.TgtHllTypeHll8,
                ),
+               "frequency_long_speed_profile": NewFrequencyLongSpeedProfile(
+                       frequencyJobConfigType{
+                               k:            1024,
+                               zipfRange:    8192,
+                               zipfExponent: 1.1,
+
+                               lgMin: 0,
+                               lgMax: 23,
+                               PPO:   16,
+
+                               lgMaxTrials: 16,
+                               lgMinTrials: 8,
+                       },
+               ),
        }
 )
 
diff --git a/go/main_test.go b/go/main_test.go
index 7d8034f..7649615 100644
--- a/go/main_test.go
+++ b/go/main_test.go
@@ -36,3 +36,7 @@ func TestHllSketchMergeSpeedRunner(t *testing.T) {
 func TestHllSketchSerdeRunner(t *testing.T) {
        jobs["distinct_count_serde_profile"].run()
 }
+
+func TestFrequencyLongSpeedRunner(t *testing.T) {
+       jobs["frequency_long_speed_profile"].run()
+}
diff --git a/go/zipf_distribution.go b/go/zipf_distribution.go
new file mode 100644
index 0000000..acd1bc8
--- /dev/null
+++ b/go/zipf_distribution.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+       "math"
+       "math/rand"
+)
+
+const (
+       zipf_taylor_threshold = 1e-8
+       zipf_f_1_2            = 0.5
+       zipf_f_1_3            = 1.0 / 3
+       zipf_f_1_4            = 0.25
+)
+
+type zipfDistribution struct {
+       numberOfElements int64
+       exponent         float64
+
+       hIntegralX1               float64
+       hIntegralNumberOfElements float64
+       s                         float64
+}
+
+func newZipfDistribution(numberOfElements int64, exponent float64) 
*zipfDistribution {
+       d := &zipfDistribution{
+               numberOfElements:          numberOfElements,
+               exponent:                  exponent,
+               hIntegralX1:               hIntegral(1.5, exponent) - 1.0,
+               hIntegralNumberOfElements: 
hIntegral(float64(numberOfElements)+zipf_f_1_2, exponent),
+               s:                         2 - hIntegralInverse(hIntegral(2.5, 
exponent)-h(2.0, exponent), exponent),
+       }
+       return d
+}
+
+func (z *zipfDistribution) sample() int64 {
+       for true {
+               u := z.hIntegralNumberOfElements + 
rand.Float64()*(z.hIntegralX1-z.hIntegralNumberOfElements)
+               x := hIntegralInverse(u, z.exponent)
+               k := int64(x + zipf_f_1_2)
+               if k < 1 {
+                       k = 1
+               } else if k > z.numberOfElements {
+                       k = z.numberOfElements
+               }
+
+               if ((float64(k) - x) <= z.s) || (u >= 
(hIntegral(float64(k)+zipf_f_1_2, z.exponent) - h(float64(k), z.exponent))) {
+                       return k
+               }
+       }
+       panic("this cannot happen")
+}
+
+func h(x float64, exponent float64) float64 {
+       return math.Exp(-exponent * math.Log(x))
+}
+
+func hIntegral(x float64, exponent float64) float64 {
+       return helper2((1-exponent)*math.Log(x)) * math.Log(x)
+}
+
+func helper1(x float64) float64 {
+       if math.Abs(x) > zipf_taylor_threshold {
+               return math.Log1p(x) / x
+       }
+       return 1 - (x * (zipf_f_1_2 - (x * (zipf_f_1_3 - (zipf_f_1_4 * x)))))
+}
+
+func helper2(x float64) float64 {
+       if math.Abs(x) > zipf_taylor_threshold {
+               return math.Expm1(x) / x
+       }
+       return 1 + (x * zipf_f_1_2 * (1 + (x * zipf_f_1_3 * (1 + (zipf_f_1_4 * 
x)))))
+}
+
+func hIntegralInverse(x float64, exponent float64) float64 {
+       t := x * (1 - exponent)
+       if t < -1 {
+               // Limit value to the reange [-1, +inf).
+               // t could be smaller than -1 in some rare cases due to 
numerical errors.
+               t = -1
+       }
+       return math.Exp(helper1(t) * x)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to