This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 97c24fa7 [YUNIKORN-1875] Compressed queue config (#675)
97c24fa7 is described below

commit 97c24fa747f411cc799a4107357db7536d8098e5
Author: skrishna <[email protected]>
AuthorDate: Tue Sep 19 18:58:52 2023 +1000

    [YUNIKORN-1875] Compressed queue config (#675)
    
    A config map on K8s can only be 1MB. Thus a larger config map could be sent
    by compressing, encoding, and storing in configMap.BinaryData.
    
    Add support gzip compressed 64bit encoded BinaryData only.
    
    Closes: #675
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/common/constants/constants.go |  3 +++
 pkg/common/utils/utils_test.go    | 23 +++++++++++++++++++++
 pkg/conf/schedulerconf.go         | 42 +++++++++++++++++++++++++++++++++++++++
 pkg/conf/schedulerconf_test.go    | 38 +++++++++++++++++++++++++++++++++++
 4 files changed, 106 insertions(+)

diff --git a/pkg/common/constants/constants.go 
b/pkg/common/constants/constants.go
index b9783b7b..af2b57ae 100644
--- a/pkg/common/constants/constants.go
+++ b/pkg/common/constants/constants.go
@@ -112,3 +112,6 @@ const AnnotationEnableYuniKorn = 
"yunikorn.apache.org/namespace.enableYuniKorn"
 // Admission Controller pod label update constants
 const AutoGenAppPrefix = "yunikorn"
 const AutoGenAppSuffix = "autogen"
+
+// Compression Algorithms for schedulerConfig
+const GzipSuffix = "gzip"
diff --git a/pkg/common/utils/utils_test.go b/pkg/common/utils/utils_test.go
index a386a50a..e921f380 100644
--- a/pkg/common/utils/utils_test.go
+++ b/pkg/common/utils/utils_test.go
@@ -19,6 +19,9 @@
 package utils
 
 import (
+       "bytes"
+       "compress/gzip"
+       "encoding/base64"
        "fmt"
        "testing"
        "time"
@@ -29,6 +32,7 @@ import (
        "k8s.io/apimachinery/pkg/api/resource"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
+       "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-k8shim/pkg/common"
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
@@ -1137,6 +1141,25 @@ func TestGetCoreSchedulerConfigFromConfigMap(t 
*testing.T) {
        assert.Equal(t, "test", GetCoreSchedulerConfigFromConfigMap(cm))
 }
 
+func TestGzipCompressedConfigMap(t *testing.T) {
+       var b bytes.Buffer
+       gzWriter := gzip.NewWriter(&b)
+       if _, err := gzWriter.Write([]byte(configs.DefaultSchedulerConfig)); 
err != nil {
+               t.Fatal("expected nil, got error while compressing test 
schedulerConfig")
+       }
+       if err := gzWriter.Close(); err != nil {
+               t.Fatal("expected nil, got error")
+       }
+       encodedConfigString := make([]byte, 
base64.StdEncoding.EncodedLen(len(b.Bytes())))
+       base64.StdEncoding.Encode(encodedConfigString, b.Bytes())
+       confMap := conf.FlattenConfigMaps([]*v1.ConfigMap{
+               {Data: map[string]string{}},
+               {Data: map[string]string{conf.CMSvcClusterID: "new"}, 
BinaryData: map[string][]byte{"queues.yaml.gzip": encodedConfigString}},
+       })
+       config := GetCoreSchedulerConfigFromConfigMap(confMap)
+       assert.DeepEqual(t, configs.DefaultSchedulerConfig, config)
+}
+
 func TestGetExtraConfigFromConfigMapNil(t *testing.T) {
        res := GetExtraConfigFromConfigMap(nil)
        assert.Equal(t, 0, len(res))
diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go
index 7b92cefd..f156ed16 100644
--- a/pkg/conf/schedulerconf.go
+++ b/pkg/conf/schedulerconf.go
@@ -19,10 +19,14 @@
 package conf
 
 import (
+       "bytes"
+       "compress/gzip"
+       "encoding/base64"
        "encoding/json"
        "errors"
        "flag"
        "fmt"
+       "io"
        "os"
        "strconv"
        "strings"
@@ -456,6 +460,40 @@ func DumpConfiguration() {
        }
 }
 
+func Decompress(key string, value []byte) (string, string) {
+       var uncompressedData string
+       decodedValue := make([]byte, base64.StdEncoding.DecodedLen(len(value)))
+       n, err := base64.StdEncoding.Decode(decodedValue, value)
+       if err != nil {
+               log.Log(log.ShimConfig).Error("failed to decode schedulerConfig 
entry", zap.Error(err))
+               return "", ""
+       }
+       decodedValue = decodedValue[:n]
+       splitKey := strings.Split(key, ".")
+       compressionAlgo := splitKey[len(splitKey)-1]
+       if strings.EqualFold(compressionAlgo, constants.GzipSuffix) {
+               reader := bytes.NewReader(decodedValue)
+               gzReader, err := gzip.NewReader(reader)
+               if err != nil {
+                       log.Log(log.ShimConfig).Error("failed to decompress 
decoded schedulerConfig entry", zap.Error(err))
+                       return "", ""
+               }
+               defer func() {
+                       if err = gzReader.Close(); err != nil {
+                               log.Log(log.ShimConfig).Debug("gzip Reader 
could not be closed ", zap.Error(err))
+                       }
+               }()
+               decompressedBytes, err := io.ReadAll(gzReader)
+               if err != nil {
+                       log.Log(log.ShimConfig).Error("failed to decompress 
decoded schedulerConfig entry", zap.Error(err))
+                       return "", ""
+               }
+               uncompressedData = string(decompressedBytes)
+       }
+       strippedKey, _ := strings.CutSuffix(key, "."+compressionAlgo)
+       return strippedKey, uncompressedData
+}
+
 func FlattenConfigMaps(configMaps []*v1.ConfigMap) map[string]string {
        result := make(map[string]string)
        for _, configMap := range configMaps {
@@ -463,6 +501,10 @@ func FlattenConfigMaps(configMaps []*v1.ConfigMap) 
map[string]string {
                        for k, v := range configMap.Data {
                                result[k] = v
                        }
+                       for k, v := range configMap.BinaryData {
+                               strippedKey, uncompressedData := Decompress(k, 
v)
+                               result[strippedKey] = uncompressedData
+                       }
                }
        }
        return result
diff --git a/pkg/conf/schedulerconf_test.go b/pkg/conf/schedulerconf_test.go
index 096e3796..f79c108a 100644
--- a/pkg/conf/schedulerconf_test.go
+++ b/pkg/conf/schedulerconf_test.go
@@ -18,14 +18,19 @@ limitations under the License.
 package conf
 
 import (
+       "bytes"
+       "compress/gzip"
+       "encoding/base64"
        "fmt"
        "reflect"
+       "strings"
        "testing"
        "time"
 
        "gotest.tools/v3/assert"
        v1 "k8s.io/api/core/v1"
 
+       "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
 )
 
@@ -72,6 +77,39 @@ func assertDefaults(t *testing.T, conf *SchedulerConf) {
        assert.Equal(t, conf.UserLabelKey, constants.DefaultUserLabel)
 }
 
+func TestDecompress(t *testing.T) {
+       var b bytes.Buffer
+       gzWriter := gzip.NewWriter(&b)
+       if _, err := gzWriter.Write([]byte(configs.DefaultSchedulerConfig)); 
err != nil {
+               assert.NilError(t, err, "expected nil, got error while 
compressing test schedulerConfig")
+       }
+       if err := gzWriter.Close(); err != nil {
+               assert.NilError(t, err, "expected nil, got error")
+               t.Fatal("expected nil, got error")
+       }
+       encodedConfigString := make([]byte, 
base64.StdEncoding.EncodedLen(len(b.Bytes())))
+       base64.StdEncoding.Encode(encodedConfigString, b.Bytes())
+       key, decodedConfigString := 
Decompress("queues.yaml."+constants.GzipSuffix, encodedConfigString)
+       assert.Equal(t, "queues.yaml", key)
+       assert.Equal(t, configs.DefaultSchedulerConfig, decodedConfigString)
+}
+
+func TestDecompressUnkownKey(t *testing.T) {
+       encodedConfigString := make([]byte, 
base64.StdEncoding.EncodedLen(len([]byte(configs.DefaultSchedulerConfig))))
+       base64.StdEncoding.Encode(encodedConfigString, 
[]byte(configs.DefaultSchedulerConfig))
+       key, decodedConfigString := Decompress("queues.yaml.bin", 
encodedConfigString)
+       assert.Equal(t, "queues.yaml", key)
+       assert.Assert(t, len(decodedConfigString) == 0, "expected 
decodedConfigString to be nil")
+}
+
+func TestDecompressBadCompression(t *testing.T) {
+       encodedConfigString := make([]byte, 
base64.StdEncoding.EncodedLen(len([]byte(configs.DefaultSchedulerConfig))))
+       base64.StdEncoding.Encode(encodedConfigString, 
[]byte(configs.DefaultSchedulerConfig))
+       key, decodedConfigString := 
Decompress("queues.yaml."+constants.GzipSuffix, encodedConfigString)
+       assert.Equal(t, "", key)
+       assert.Assert(t, !strings.EqualFold(configs.DefaultSchedulerConfig, 
decodedConfigString), "expected decodedConfigString to be nil")
+}
+
 func TestParseConfigMap(t *testing.T) {
        testCases := []struct {
                name  string


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to