This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 97c24fa7 [YUNIKORN-1875] Compressed queue config (#675)
97c24fa7 is described below
commit 97c24fa747f411cc799a4107357db7536d8098e5
Author: skrishna <[email protected]>
AuthorDate: Tue Sep 19 18:58:52 2023 +1000
[YUNIKORN-1875] Compressed queue config (#675)
A config map on K8s can only be 1MB. Thus a larger config map could be sent
by compressing, encoding, and storing in configMap.BinaryData.
Add support gzip compressed 64bit encoded BinaryData only.
Closes: #675
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/common/constants/constants.go | 3 +++
pkg/common/utils/utils_test.go | 23 +++++++++++++++++++++
pkg/conf/schedulerconf.go | 42 +++++++++++++++++++++++++++++++++++++++
pkg/conf/schedulerconf_test.go | 38 +++++++++++++++++++++++++++++++++++
4 files changed, 106 insertions(+)
diff --git a/pkg/common/constants/constants.go
b/pkg/common/constants/constants.go
index b9783b7b..af2b57ae 100644
--- a/pkg/common/constants/constants.go
+++ b/pkg/common/constants/constants.go
@@ -112,3 +112,6 @@ const AnnotationEnableYuniKorn =
"yunikorn.apache.org/namespace.enableYuniKorn"
// Admission Controller pod label update constants
const AutoGenAppPrefix = "yunikorn"
const AutoGenAppSuffix = "autogen"
+
+// Compression Algorithms for schedulerConfig
+const GzipSuffix = "gzip"
diff --git a/pkg/common/utils/utils_test.go b/pkg/common/utils/utils_test.go
index a386a50a..e921f380 100644
--- a/pkg/common/utils/utils_test.go
+++ b/pkg/common/utils/utils_test.go
@@ -19,6 +19,9 @@
package utils
import (
+ "bytes"
+ "compress/gzip"
+ "encoding/base64"
"fmt"
"testing"
"time"
@@ -29,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/conf"
@@ -1137,6 +1141,25 @@ func TestGetCoreSchedulerConfigFromConfigMap(t
*testing.T) {
assert.Equal(t, "test", GetCoreSchedulerConfigFromConfigMap(cm))
}
+func TestGzipCompressedConfigMap(t *testing.T) {
+ var b bytes.Buffer
+ gzWriter := gzip.NewWriter(&b)
+ if _, err := gzWriter.Write([]byte(configs.DefaultSchedulerConfig));
err != nil {
+ t.Fatal("expected nil, got error while compressing test
schedulerConfig")
+ }
+ if err := gzWriter.Close(); err != nil {
+ t.Fatal("expected nil, got error")
+ }
+ encodedConfigString := make([]byte,
base64.StdEncoding.EncodedLen(len(b.Bytes())))
+ base64.StdEncoding.Encode(encodedConfigString, b.Bytes())
+ confMap := conf.FlattenConfigMaps([]*v1.ConfigMap{
+ {Data: map[string]string{}},
+ {Data: map[string]string{conf.CMSvcClusterID: "new"},
BinaryData: map[string][]byte{"queues.yaml.gzip": encodedConfigString}},
+ })
+ config := GetCoreSchedulerConfigFromConfigMap(confMap)
+ assert.DeepEqual(t, configs.DefaultSchedulerConfig, config)
+}
+
func TestGetExtraConfigFromConfigMapNil(t *testing.T) {
res := GetExtraConfigFromConfigMap(nil)
assert.Equal(t, 0, len(res))
diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go
index 7b92cefd..f156ed16 100644
--- a/pkg/conf/schedulerconf.go
+++ b/pkg/conf/schedulerconf.go
@@ -19,10 +19,14 @@
package conf
import (
+ "bytes"
+ "compress/gzip"
+ "encoding/base64"
"encoding/json"
"errors"
"flag"
"fmt"
+ "io"
"os"
"strconv"
"strings"
@@ -456,6 +460,40 @@ func DumpConfiguration() {
}
}
+func Decompress(key string, value []byte) (string, string) {
+ var uncompressedData string
+ decodedValue := make([]byte, base64.StdEncoding.DecodedLen(len(value)))
+ n, err := base64.StdEncoding.Decode(decodedValue, value)
+ if err != nil {
+ log.Log(log.ShimConfig).Error("failed to decode schedulerConfig
entry", zap.Error(err))
+ return "", ""
+ }
+ decodedValue = decodedValue[:n]
+ splitKey := strings.Split(key, ".")
+ compressionAlgo := splitKey[len(splitKey)-1]
+ if strings.EqualFold(compressionAlgo, constants.GzipSuffix) {
+ reader := bytes.NewReader(decodedValue)
+ gzReader, err := gzip.NewReader(reader)
+ if err != nil {
+ log.Log(log.ShimConfig).Error("failed to decompress
decoded schedulerConfig entry", zap.Error(err))
+ return "", ""
+ }
+ defer func() {
+ if err = gzReader.Close(); err != nil {
+ log.Log(log.ShimConfig).Debug("gzip Reader
could not be closed ", zap.Error(err))
+ }
+ }()
+ decompressedBytes, err := io.ReadAll(gzReader)
+ if err != nil {
+ log.Log(log.ShimConfig).Error("failed to decompress
decoded schedulerConfig entry", zap.Error(err))
+ return "", ""
+ }
+ uncompressedData = string(decompressedBytes)
+ }
+ strippedKey, _ := strings.CutSuffix(key, "."+compressionAlgo)
+ return strippedKey, uncompressedData
+}
+
func FlattenConfigMaps(configMaps []*v1.ConfigMap) map[string]string {
result := make(map[string]string)
for _, configMap := range configMaps {
@@ -463,6 +501,10 @@ func FlattenConfigMaps(configMaps []*v1.ConfigMap)
map[string]string {
for k, v := range configMap.Data {
result[k] = v
}
+ for k, v := range configMap.BinaryData {
+ strippedKey, uncompressedData := Decompress(k,
v)
+ result[strippedKey] = uncompressedData
+ }
}
}
return result
diff --git a/pkg/conf/schedulerconf_test.go b/pkg/conf/schedulerconf_test.go
index 096e3796..f79c108a 100644
--- a/pkg/conf/schedulerconf_test.go
+++ b/pkg/conf/schedulerconf_test.go
@@ -18,14 +18,19 @@ limitations under the License.
package conf
import (
+ "bytes"
+ "compress/gzip"
+ "encoding/base64"
"fmt"
"reflect"
+ "strings"
"testing"
"time"
"gotest.tools/v3/assert"
v1 "k8s.io/api/core/v1"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
)
@@ -72,6 +77,39 @@ func assertDefaults(t *testing.T, conf *SchedulerConf) {
assert.Equal(t, conf.UserLabelKey, constants.DefaultUserLabel)
}
+func TestDecompress(t *testing.T) {
+ var b bytes.Buffer
+ gzWriter := gzip.NewWriter(&b)
+ if _, err := gzWriter.Write([]byte(configs.DefaultSchedulerConfig));
err != nil {
+ assert.NilError(t, err, "expected nil, got error while
compressing test schedulerConfig")
+ }
+ if err := gzWriter.Close(); err != nil {
+ assert.NilError(t, err, "expected nil, got error")
+ t.Fatal("expected nil, got error")
+ }
+ encodedConfigString := make([]byte,
base64.StdEncoding.EncodedLen(len(b.Bytes())))
+ base64.StdEncoding.Encode(encodedConfigString, b.Bytes())
+ key, decodedConfigString :=
Decompress("queues.yaml."+constants.GzipSuffix, encodedConfigString)
+ assert.Equal(t, "queues.yaml", key)
+ assert.Equal(t, configs.DefaultSchedulerConfig, decodedConfigString)
+}
+
+func TestDecompressUnkownKey(t *testing.T) {
+ encodedConfigString := make([]byte,
base64.StdEncoding.EncodedLen(len([]byte(configs.DefaultSchedulerConfig))))
+ base64.StdEncoding.Encode(encodedConfigString,
[]byte(configs.DefaultSchedulerConfig))
+ key, decodedConfigString := Decompress("queues.yaml.bin",
encodedConfigString)
+ assert.Equal(t, "queues.yaml", key)
+ assert.Assert(t, len(decodedConfigString) == 0, "expected
decodedConfigString to be nil")
+}
+
+func TestDecompressBadCompression(t *testing.T) {
+ encodedConfigString := make([]byte,
base64.StdEncoding.EncodedLen(len([]byte(configs.DefaultSchedulerConfig))))
+ base64.StdEncoding.Encode(encodedConfigString,
[]byte(configs.DefaultSchedulerConfig))
+ key, decodedConfigString :=
Decompress("queues.yaml."+constants.GzipSuffix, encodedConfigString)
+ assert.Equal(t, "", key)
+ assert.Assert(t, !strings.EqualFold(configs.DefaultSchedulerConfig,
decodedConfigString), "expected decodedConfigString to be nil")
+}
+
func TestParseConfigMap(t *testing.T) {
testCases := []struct {
name string
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]