This is an automated email from the ASF dual-hosted git repository.
placave pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/datasketches-characterization.git
The following commit(s) were added to refs/heads/master by this push:
new 11e6743 [Go] Add FrequencyLong speed profile
11e6743 is described below
commit 11e67434c79bddc721542e58bebf05aa8d149e21
Author: Pierre Lacave <[email protected]>
AuthorDate: Fri May 17 19:20:39 2024 +0200
[Go] Add FrequencyLong speed profile
---
go/frequency_long_speed_profile.go | 164 ++++++++++++++++++++++++++++++++
go/{main_test.go => frequency_utils.go} | 23 ++---
go/main.go | 14 +++
go/main_test.go | 4 +
go/zipf_distribution.go | 100 +++++++++++++++++++
5 files changed, 291 insertions(+), 14 deletions(-)
diff --git a/go/frequency_long_speed_profile.go
b/go/frequency_long_speed_profile.go
new file mode 100644
index 0000000..dea88fb
--- /dev/null
+++ b/go/frequency_long_speed_profile.go
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+ "fmt"
+ "github.com/apache/datasketches-go/frequencies"
+ "math"
+ "runtime"
+ "runtime/debug"
+ "strings"
+ "time"
+)
+
+type FrequencyLongSpeedProfile struct {
+ config frequencyJobConfigType
+ zipf *zipfDistribution
+ startTime int64
+ stats frequencySpeedStats
+ inputValues []int64
+}
+
+type frequencySpeedStats struct {
+ buildTimeNs int64
+ updateTimeNs int64
+ serializeTimeNs int64
+ deserializeTimeNs int64
+ numRetainedItems int64
+ serializedSizeBytes int64
+}
+
+func NewFrequencyLongSpeedProfile(config frequencyJobConfigType)
*FrequencyLongSpeedProfile {
+ return &FrequencyLongSpeedProfile{
+ config: config,
+ zipf: newZipfDistribution(int64(config.zipfRange),
config.zipfExponent),
+ startTime: time.Now().UnixMilli(),
+ stats: frequencySpeedStats{},
+ }
+}
+
+func (d *FrequencyLongSpeedProfile) run() {
+ sb := &strings.Builder{}
+ debug.SetGCPercent(-1)
+ debug.SetMemoryLimit(math.MaxInt64)
+
+ lgMinStreamLen := d.config.lgMin
+ lgMaxStreamLen := d.config.lgMax
+ pointsPerOctave := d.config.PPO
+ lgMinTrials := d.config.lgMinTrials
+ lgMaxTrials := d.config.lgMaxTrials
+
+ minStreamLen := 1 << lgMinStreamLen
+ maxStreamLen := 1 << lgMaxStreamLen
+
+ d.setHeader(sb)
+ fmt.Println(sb.String())
+
+ streamLength := minStreamLen
+ for streamLength <= maxStreamLen {
+ sb.Reset()
+ numTrials := getNumTrials(streamLength, lgMinStreamLen,
lgMaxStreamLen, lgMinTrials, lgMaxTrials)
+ d.resetStats()
+ for i := 0; i < numTrials; i++ {
+ d.prepareTrial(streamLength)
+ d.process()
+ }
+ runtime.GC()
+ d.getStats(streamLength, numTrials, sb)
+ fmt.Println(sb.String())
+ streamLength = int(pwr2SeriesNext(pointsPerOctave,
uint64(streamLength)))
+ }
+}
+
+func (d *FrequencyLongSpeedProfile) process() {
+ startBuild := time.Now().UnixNano()
+ sketch, err := frequencies.NewLongsSketchWithMaxMapSize(d.config.k)
+ if err != nil {
+ panic(err)
+ }
+ stopBuild := time.Now().UnixNano()
+ d.stats.buildTimeNs += stopBuild - startBuild
+
+ startUpdate := time.Now().UnixNano()
+ for i := 0; i < len(d.inputValues); i++ {
+ err := sketch.Update(d.inputValues[i])
+ if err != nil {
+ panic(err)
+ }
+ }
+ stopUpdate := time.Now().UnixNano()
+ d.stats.updateTimeNs += stopUpdate - startUpdate
+
+ startSerialize := time.Now().UnixNano()
+ bytes := sketch.ToSlice()
+ stopSerialize := time.Now().UnixNano()
+ d.stats.serializeTimeNs += stopSerialize - startSerialize
+
+ startDeserialize := time.Now().UnixNano()
+ _, err = frequencies.NewLongsSketchFromSlice(bytes)
+ if err != nil {
+ panic(err)
+ }
+ stopDeserialize := time.Now().UnixNano()
+ d.stats.deserializeTimeNs += stopDeserialize - startDeserialize
+
+ d.stats.numRetainedItems += int64(sketch.GetNumActiveItems())
+ d.stats.serializedSizeBytes += int64(len(bytes))
+
+}
+
+func (d *FrequencyLongSpeedProfile) setHeader(sb *strings.Builder) {
+
sb.WriteString("Stream\tTrials\tBuild\tUpdate\tSer\tDeser\tItems\tstatsSize")
+}
+
+func (d *FrequencyLongSpeedProfile) getStats(streamLength int, numTrials int,
sb *strings.Builder) {
+ sb.WriteString(fmt.Sprintf("%d\t%d\t%.1f\t%.1f\t%.1f\t%.1f\t%.1f\t%.1f",
+ streamLength,
+ numTrials,
+ float64(d.stats.buildTimeNs)/float64(numTrials),
+
float64(d.stats.updateTimeNs)/float64(numTrials)/float64(streamLength),
+ float64(d.stats.serializeTimeNs)/float64(numTrials),
+ float64(d.stats.deserializeTimeNs)/float64(numTrials),
+ float64(d.stats.numRetainedItems)/float64(numTrials),
+ float64(d.stats.serializedSizeBytes)/float64(numTrials),
+ ))
+}
+
+func (d *FrequencyLongSpeedProfile) resetStats() {
+ d.stats.buildTimeNs = 0
+ d.stats.updateTimeNs = 0
+ d.stats.serializeTimeNs = 0
+ d.stats.deserializeTimeNs = 0
+ d.stats.numRetainedItems = 0
+ d.stats.serializedSizeBytes = 0
+}
+
+func (d *FrequencyLongSpeedProfile) prepareTrial(streamLength int) {
+ // prepare input data
+ d.inputValues = make([]int64, streamLength)
+ for i := 0; i < streamLength; i++ {
+ d.inputValues[i] = d.zipf.sample()
+ }
+}
+
+func getNumTrials(x, lgMinX, lgMaxX, lgMinTrials, lgMaxTrials int) int {
+ slope := float64(lgMaxTrials-lgMinTrials) / float64(lgMinX-lgMaxX)
+ lgX := math.Log(float64(x)) / math.Log(2.0)
+ lgTrials := slope*lgX + float64(lgMaxTrials)
+ return int(math.Pow(2.0, lgTrials))
+}
diff --git a/go/main_test.go b/go/frequency_utils.go
similarity index 66%
copy from go/main_test.go
copy to go/frequency_utils.go
index 7d8034f..0664f9f 100644
--- a/go/main_test.go
+++ b/go/frequency_utils.go
@@ -17,22 +17,17 @@
package main
-import (
- "testing"
-)
+type frequencyJobConfigType struct {
+ k int // sketch size and accuracy parameter
-func TestHllSketchAccuracyRunner(t *testing.T) {
- jobs["distinct_count_accuracy_profile"].run()
-}
+ lgMin int // The starting stream length
+ lgMax int // How high the stream length goes
-func TestHllSketchMergeAccuracyRunner(t *testing.T) {
- jobs["distinct_count_merge_accuracy_profile"].run()
-}
+ PPO int // The horizontal x-resolution of trials points
-func TestHllSketchMergeSpeedRunner(t *testing.T) {
- jobs["distinct_count_merge_speed_profile"].run()
-}
+ lgMinTrials int // Min trials at tail (high counts)
+ lgMaxTrials int // Max trials at start (low counts)
-func TestHllSketchSerdeRunner(t *testing.T) {
- jobs["distinct_count_serde_profile"].run()
+ zipfRange int
+ zipfExponent float64
}
diff --git a/go/main.go b/go/main.go
index c60399b..51eef42 100644
--- a/go/main.go
+++ b/go/main.go
@@ -79,6 +79,20 @@ var (
},
hll.TgtHllTypeHll8,
),
+ "frequency_long_speed_profile": NewFrequencyLongSpeedProfile(
+ frequencyJobConfigType{
+ k: 1024,
+ zipfRange: 8192,
+ zipfExponent: 1.1,
+
+ lgMin: 0,
+ lgMax: 23,
+ PPO: 16,
+
+ lgMaxTrials: 16,
+ lgMinTrials: 8,
+ },
+ ),
}
)
diff --git a/go/main_test.go b/go/main_test.go
index 7d8034f..7649615 100644
--- a/go/main_test.go
+++ b/go/main_test.go
@@ -36,3 +36,7 @@ func TestHllSketchMergeSpeedRunner(t *testing.T) {
func TestHllSketchSerdeRunner(t *testing.T) {
jobs["distinct_count_serde_profile"].run()
}
+
+func TestFrequencyLongSpeedRunner(t *testing.T) {
+ jobs["frequency_long_speed_profile"].run()
+}
diff --git a/go/zipf_distribution.go b/go/zipf_distribution.go
new file mode 100644
index 0000000..acd1bc8
--- /dev/null
+++ b/go/zipf_distribution.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "math"
+ "math/rand"
+)
+
+const (
+ zipf_taylor_threshold = 1e-8
+ zipf_f_1_2 = 0.5
+ zipf_f_1_3 = 1.0 / 3
+ zipf_f_1_4 = 0.25
+)
+
+type zipfDistribution struct {
+ numberOfElements int64
+ exponent float64
+
+ hIntegralX1 float64
+ hIntegralNumberOfElements float64
+ s float64
+}
+
+func newZipfDistribution(numberOfElements int64, exponent float64)
*zipfDistribution {
+ d := &zipfDistribution{
+ numberOfElements: numberOfElements,
+ exponent: exponent,
+ hIntegralX1: hIntegral(1.5, exponent) - 1.0,
+ hIntegralNumberOfElements:
hIntegral(float64(numberOfElements)+zipf_f_1_2, exponent),
+ s: 2 - hIntegralInverse(hIntegral(2.5,
exponent)-h(2.0, exponent), exponent),
+ }
+ return d
+}
+
+func (z *zipfDistribution) sample() int64 {
+ for true {
+ u := z.hIntegralNumberOfElements +
rand.Float64()*(z.hIntegralX1-z.hIntegralNumberOfElements)
+ x := hIntegralInverse(u, z.exponent)
+ k := int64(x + zipf_f_1_2)
+ if k < 1 {
+ k = 1
+ } else if k > z.numberOfElements {
+ k = z.numberOfElements
+ }
+
+ if ((float64(k) - x) <= z.s) || (u >=
(hIntegral(float64(k)+zipf_f_1_2, z.exponent) - h(float64(k), z.exponent))) {
+ return k
+ }
+ }
+ panic("this cannot happen")
+}
+
+func h(x float64, exponent float64) float64 {
+ return math.Exp(-exponent * math.Log(x))
+}
+
+func hIntegral(x float64, exponent float64) float64 {
+ return helper2((1-exponent)*math.Log(x)) * math.Log(x)
+}
+
+func helper1(x float64) float64 {
+ if math.Abs(x) > zipf_taylor_threshold {
+ return math.Log1p(x) / x
+ }
+ return 1 - (x * (zipf_f_1_2 - (x * (zipf_f_1_3 - (zipf_f_1_4 * x)))))
+}
+
+func helper2(x float64) float64 {
+ if math.Abs(x) > zipf_taylor_threshold {
+ return math.Expm1(x) / x
+ }
+ return 1 + (x * zipf_f_1_2 * (1 + (x * zipf_f_1_3 * (1 + (zipf_f_1_4 *
x)))))
+}
+
+func hIntegralInverse(x float64, exponent float64) float64 {
+ t := x * (1 - exponent)
+ if t < -1 {
+ // Limit value to the reange [-1, +inf).
+ // t could be smaller than -1 in some rare cases due to
numerical errors.
+ t = -1
+ }
+ return math.Exp(helper1(t) * x)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]