shamrickus commented on a change in pull request #5924: URL: https://github.com/apache/trafficcontrol/pull/5924#discussion_r651041593
########## File path: tools/traffic_vault_migrate/riak.go ########## @@ -0,0 +1,746 @@ +package main + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import ( + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/apache/trafficcontrol/lib/go-rfc" + "github.com/apache/trafficcontrol/lib/go-tc" + + "github.com/basho/riak-go-client" +) + +const ( + BUCKET_SSL = "ssl" + BUCKET_DNSSEC = "dnssec" + BUCKET_URL_SIG = "url_sig_keys" + BUCKET_URI_SIG = "cdn_uri_sig_keys" + + INDEX_SSL = "sslkeys" + + SCHEMA_RIAK_KEY = "_yz_rk" + SCHEMA_RIAK_BUCKET = "_yz_rb" +) + +var ( + SCHEMA_SSL_FIELDS = [...]string{SCHEMA_RIAK_KEY, SCHEMA_RIAK_BUCKET} +) + +// RiakConfig represents the configuration options available to the Riak backend +type RiakConfig struct { + Host string `json:"host"` + Port string `json:"port"` + User string `json:"user"` + Password string `json:"password"` + Insecure bool `json:"insecure"` + TLSVersionRaw string `json:"tlsVersion"` + TLSVersion uint16 +} + +// RiakBackend is the Riak implementation of TVBackend +type RiakBackend struct { + sslKeys riakSSLKeyTable + dnssecKeys riakDNSSecKeyTable + uriSignKeys riakURISignKeyTable + urlSigKeys riakURLSigKeyTable + cfg RiakConfig + cluster *riak.Cluster +} + +// String returns a high level overview of the backend and its keys +func (rb *RiakBackend) String() string { + data := fmt.Sprintf("Riak server %v@%v:%v\n", rb.cfg.User, rb.cfg.Host, rb.cfg.Port) + data += fmt.Sprintf("\tSSL Keys: %v\n", len(rb.sslKeys.Records)) + data += fmt.Sprintf("\tDNSSec Keys: %v\n", len(rb.dnssecKeys.Records)) + data += fmt.Sprintf("\tURI Keys: %v\n", len(rb.uriSignKeys.Records)) + data += fmt.Sprintf("\tURL Keys: %v\n", len(rb.urlSigKeys.Records)) + return data +} + +// Name returns the name for this backend +func (rb *RiakBackend) Name() string { + return "Riak" +} + +// ReadConfig takes in a filename and will read it into the backends config +func (rb *RiakBackend) ReadConfig(s string) error { + err := UnmarshalConfig(s, &rb.cfg) + if err != nil { + return err + } + + if rb.cfg.TLSVersionRaw == "10" { + rb.cfg.TLSVersion = tls.VersionTLS10 + } else if rb.cfg.TLSVersionRaw == "11" { + rb.cfg.TLSVersion = tls.VersionTLS11 + } else if rb.cfg.TLSVersionRaw == "12" { + rb.cfg.TLSVersion = tls.VersionTLS12 + } else if rb.cfg.TLSVersionRaw == "13" { + rb.cfg.TLSVersion = tls.VersionTLS13 + } + return nil +} + +// Insert takes the current keys and inserts them into the backend DB +func (rb *RiakBackend) Insert() error { + if err := rb.sslKeys.insertKeys(rb.cluster); err != nil { + return err + } + if err := rb.dnssecKeys.insertKeys(rb.cluster); err != nil { + return err + } + if err := rb.urlSigKeys.insertKeys(rb.cluster); err != nil { + return err + } + if err := rb.uriSignKeys.insertKeys(rb.cluster); err != nil { + return err + } + return nil +} + +// ValidateKey validates that the keys are valid (in most cases, certain fields are not null) +func (rb *RiakBackend) ValidateKey() []string { + errors := []string{} + if errs := rb.sslKeys.validate(); errs != nil { + errors = append(errors, errs...) + } + if errs := rb.dnssecKeys.validate(); errs != nil { + errors = append(errors, errs...) + } + if errs := rb.uriSignKeys.validate(); errs != nil { + errors = append(errors, errs...) + } + if errs := rb.urlSigKeys.validate(); errs != nil { + errors = append(errors, errs...) + } + + return errors +} + +// SetSSLKeys takes in keys and converts & encrypts the data into the backends internal format +func (rb *RiakBackend) SetSSLKeys(keys []SSLKey) error { + rb.sslKeys.fromGeneric(keys) + return nil +} + +// SetDNSSecKeys takes in keys and converts & encrypts the data into the backends internal format +func (rb *RiakBackend) SetDNSSecKeys(keys []DNSSecKey) error { + rb.dnssecKeys.fromGeneric(keys) + return nil +} + +// SetURISignKeys takes in keys and converts & encrypts the data into the backends internal format +func (rb *RiakBackend) SetURISignKeys(keys []URISignKey) error { + rb.uriSignKeys.fromGeneric(keys) + return nil +} + +// SetURLSigKeys takes in keys and converts & encrypts the data into the backends internal format +func (rb *RiakBackend) SetURLSigKeys(keys []URLSigKey) error { + rb.urlSigKeys.fromGeneric(keys) + return nil +} + +// Start initiates the connection to the backend DB +func (rb *RiakBackend) Start() error { + tlsConfig := &tls.Config{ + InsecureSkipVerify: !rb.cfg.Insecure, + MaxVersion: rb.cfg.TLSVersion, + } + auth := &riak.AuthOptions{ + User: rb.cfg.User, + Password: rb.cfg.Password, + TlsConfig: tlsConfig, + } + + cluster, err := getRiakCluster(rb.cfg, auth) + if err != nil { + return err + } + if err := cluster.Start(); err != nil { + return fmt.Errorf("unable to start riak cluster: %w", err) + } + + rb.cluster = cluster + rb.sslKeys = riakSSLKeyTable{} + rb.dnssecKeys = riakDNSSecKeyTable{} + rb.urlSigKeys = riakURLSigKeyTable{} + rb.uriSignKeys = riakURISignKeyTable{} + return nil +} + +// Close terminates the connection to the backend DB +func (rb *RiakBackend) Close() error { + if err := rb.cluster.Stop(); err != nil { + return err + } + return nil +} + +// Ping checks the connection to the backend DB +func (rb *RiakBackend) Ping() error { + return ping(rb.cluster) +} + +// GetSSLKeys converts the backends internal key representation into the common representation (SSLKey) +func (rb *RiakBackend) GetSSLKeys() ([]SSLKey, error) { + return rb.sslKeys.toGeneric(), nil +} + +// GetDNSSecKeys converts the backends internal key representation into the common representation (DNSSecKey) +func (rb *RiakBackend) GetDNSSecKeys() ([]DNSSecKey, error) { + return rb.dnssecKeys.toGeneric(), nil +} + +// GetURISignKeys converts the pg internal key representation into the common representation (URISignKey) +func (rb *RiakBackend) GetURISignKeys() ([]URISignKey, error) { + return rb.uriSignKeys.toGeneric(), nil +} + +// GetURLSigKeys converts the backends internal key representation into the common representation (URLSigKey) +func (rb *RiakBackend) GetURLSigKeys() ([]URLSigKey, error) { + return rb.urlSigKeys.toGeneric(), nil +} + +// Fetch gets all of the keys from the backend DB +func (rb *RiakBackend) Fetch() error { + if err := rb.sslKeys.gatherKeys(rb.cluster); err != nil { + return err + } + if err := rb.dnssecKeys.gatherKeys(rb.cluster); err != nil { + return err + } + if err := rb.urlSigKeys.gatherKeys(rb.cluster); err != nil { + return err + } + if err := rb.uriSignKeys.gatherKeys(rb.cluster); err != nil { + return err + } + + return nil +} + +type riakSSLKeyRecord struct { + tc.DeliveryServiceSSLKeys +} +type riakSSLKeyTable struct { + Records []riakSSLKeyRecord +} + +func (tbl *riakSSLKeyTable) gatherKeys(cluster *riak.Cluster) error { + searchDocs, err := search(cluster, INDEX_SSL, "cdn:*", fmt.Sprintf("%v:*latest", SCHEMA_RIAK_KEY), 1000, SCHEMA_SSL_FIELDS[:]) + if err != nil { + return fmt.Errorf("RiakSSLKey gatherKeys: %w", err) + } + + tbl.Records = make([]riakSSLKeyRecord, len(searchDocs)) + for i, doc := range searchDocs { + objs, err := getObject(cluster, doc.Bucket, doc.Key) + if err != nil { + return err + } + if len(objs) < 1 { + return fmt.Errorf("RiakSSLKey gatherKeys unable to find any objects with key %v and bucket %v, but search results were returned", doc.Key, doc.Bucket) + } + if len(objs) > 1 { + return fmt.Errorf("RiakSSLKey gatherKeys more than 1 ssl key record found %v\n", len(objs)) + } + var obj tc.DeliveryServiceSSLKeys + if err = json.Unmarshal(objs[0].Value, &obj); err != nil { + return fmt.Errorf("RiakSSLKey gatherKeys unable to unmarshal object into tc.DeliveryServiceSSLKeys: %w", err) + } + tbl.Records[i] = riakSSLKeyRecord{ + DeliveryServiceSSLKeys: obj, + } + } + return nil +} +func (tbl *riakSSLKeyTable) toGeneric() []SSLKey { + keys := make([]SSLKey, len(tbl.Records)) + + for i, record := range tbl.Records { + keys[i] = SSLKey{ + DeliveryServiceSSLKeys: record.DeliveryServiceSSLKeys, + } + } + + return keys +} +func (tbl *riakSSLKeyTable) fromGeneric(keys []SSLKey) { + tbl.Records = make([]riakSSLKeyRecord, len(keys)) + + for i, record := range keys { + tbl.Records[i] = riakSSLKeyRecord{ + DeliveryServiceSSLKeys: record.DeliveryServiceSSLKeys, + } + } +} +func (tbl *riakSSLKeyTable) insertKeys(cluster *riak.Cluster) error { + for _, record := range tbl.Records { + objBytes, err := json.Marshal(record.DeliveryServiceSSLKeys) + if err != nil { + return fmt.Errorf("RiakSSLKey insertKeys failed to unmarshal keys: %w", err) + } + err = setObject(cluster, makeRiakObject(objBytes, record.DeliveryService+"-"+record.Version.String()), BUCKET_SSL) + if err != nil { + return fmt.Errorf("RiakSSLKey insertKeys: %w", err) + } + err = setObject(cluster, makeRiakObject(objBytes, record.DeliveryService+"-latest"), BUCKET_SSL) + if err != nil { + return fmt.Errorf("RiakSSLKey insertKeys: %w", err) + } + } + return nil +} +func (tbl *riakSSLKeyTable) validate() []string { + errs := []string{} + for i, record := range tbl.Records { + if record.DeliveryService == "" { + errs = append(errs, fmt.Sprintf("SSL Key #%v: Delivery Service is blank!", i)) + } + if record.CDN == "" { + errs = append(errs, fmt.Sprintf("SSL Key #%v: CDN is blank!", i)) + } + if record.Version.String() == "" { + errs = append(errs, fmt.Sprintf("SSL Key #%v: Version is blank!", i)) + } + } + return errs +} + +type riakDNSSecKeyRecord struct { + CDN string + Key tc.DNSSECKeysRiak +} +type riakDNSSecKeyTable struct { + Records []riakDNSSecKeyRecord +} + +func (tbl *riakDNSSecKeyTable) gatherKeys(cluster *riak.Cluster) error { + tbl.Records = []riakDNSSecKeyRecord{} + objs, err := getObjects(cluster, BUCKET_DNSSEC) + if err != nil { + return fmt.Errorf("RiakDNSSecKey gatherKeys: %w", err) + } + for _, obj := range objs { + key := tc.DNSSECKeysRiak{} + if err := json.Unmarshal(obj.Value, &key); err != nil { + return fmt.Errorf("RiakDNSSecKey gatherKeys unable to unmarshal object to tc.DNSSECKeysRiak: %w", err) + } + tbl.Records = append(tbl.Records, riakDNSSecKeyRecord{ + CDN: obj.Key, + Key: key, + }) + } + return nil +} +func (tbl *riakDNSSecKeyTable) toGeneric() []DNSSecKey { + keys := make([]DNSSecKey, len(tbl.Records)) + + for i, record := range tbl.Records { + keys[i] = DNSSecKey{ + CDN: record.CDN, + DNSSECKeysTrafficVault: tc.DNSSECKeysTrafficVault(record.Key), + } + } + + return keys +} +func (tbl *riakDNSSecKeyTable) fromGeneric(keys []DNSSecKey) { + tbl.Records = make([]riakDNSSecKeyRecord, len(keys)) + + for i, record := range keys { + tbl.Records[i] = riakDNSSecKeyRecord{ + CDN: record.CDN, + Key: tc.DNSSECKeysRiak(record.DNSSECKeysTrafficVault), + } + } +} +func (tbl *riakDNSSecKeyTable) insertKeys(cluster *riak.Cluster) error { + for _, record := range tbl.Records { + objBytes, err := json.Marshal(record.Key) + if err != nil { + return fmt.Errorf("RiakDNSSecKey insertKeys error marshalling keys: %w", err) + } + + err = setObject(cluster, makeRiakObject(objBytes, record.CDN), BUCKET_DNSSEC) + if err != nil { + return fmt.Errorf("RiakDNSSecKey insertKeys: %w", err) + } + } + return nil +} +func (tbl *riakDNSSecKeyTable) validate() []string { + errs := []string{} + for i, record := range tbl.Records { + if record.CDN == "" { + errs = append(errs, fmt.Sprintf("DNSSec Key #%v: CDN is blank!", i)) + } + } + return errs +} + +type riakURLSigKeyRecord struct { + Key tc.URLSigKeys + DeliveryService string +} +type riakURLSigKeyTable struct { + Records []riakURLSigKeyRecord +} + +func (tbl *riakURLSigKeyTable) gatherKeys(cluster *riak.Cluster) error { + tbl.Records = []riakURLSigKeyRecord{} + objs, err := getObjects(cluster, BUCKET_URL_SIG) + if err != nil { + return fmt.Errorf("RiakURLSigKey gatherKeys: %w", err) + } + for _, obj := range objs { + key := tc.URLSigKeys{} + if err := json.Unmarshal(obj.Value, &key); err != nil { + return fmt.Errorf("RiakURLSigKey gatherKeys unable to unamrshal object into tc.URLSigKeys: %w", err) + } + tbl.Records = append(tbl.Records, riakURLSigKeyRecord{ + DeliveryService: obj.Key, + Key: key, + }) + } + return nil +} +func (tbl *riakURLSigKeyTable) toGeneric() []URLSigKey { + keys := make([]URLSigKey, len(tbl.Records)) + + for i, record := range tbl.Records { + keys[i] = URLSigKey{ + URLSigKeys: record.Key, + DeliveryService: record.DeliveryService, + } + } + + return keys +} +func (tbl *riakURLSigKeyTable) fromGeneric(keys []URLSigKey) { + tbl.Records = make([]riakURLSigKeyRecord, len(keys)) + for i, key := range keys { + tbl.Records[i] = riakURLSigKeyRecord{ + Key: key.URLSigKeys, + DeliveryService: key.DeliveryService, + } + } +} +func (tbl *riakURLSigKeyTable) insertKeys(cluster *riak.Cluster) error { + for _, record := range tbl.Records { + objBytes, err := json.Marshal(record.Key) + if err != nil { + return fmt.Errorf("RiakURLSigKey insertKeys unable to marshal keys: %w", err) + } + + err = setObject(cluster, makeRiakObject(objBytes, "url_sig_"+record.DeliveryService+".config"), BUCKET_URL_SIG) + if err != nil { + return fmt.Errorf("RiakURLSigKey insertKeys: %w", err) + } + } + return nil +} +func (tbl *riakURLSigKeyTable) validate() []string { + errs := []string{} + for i, record := range tbl.Records { + if record.DeliveryService == "" { + errs = append(errs, fmt.Sprintf("URL Key #%v: Delivery Service is blank!", i)) + } + } + return errs +} + +type riakURISignKeyRecord struct { + Key map[string]tc.URISignerKeyset + DeliveryService string +} +type riakURISignKeyTable struct { + Records []riakURISignKeyRecord +} + +func (tbl *riakURISignKeyTable) gatherKeys(cluster *riak.Cluster) error { + tbl.Records = []riakURISignKeyRecord{} + objs, err := getObjects(cluster, BUCKET_URI_SIG) + if err != nil { + return fmt.Errorf("RiakURISignKey gatherKeys: %w", err) + } + for _, obj := range objs { + key := map[string]tc.URISignerKeyset{} + if err := json.Unmarshal(obj.Value, &key); err != nil { + return fmt.Errorf("RiakURISignKey gatherKeys unable to unmarshal object into map[string]tc.URISignerKeySet: %w", err) + } + + tbl.Records = append(tbl.Records, riakURISignKeyRecord{ + DeliveryService: obj.Key, + Key: key, + }) + } + return nil +} +func (tbl *riakURISignKeyTable) toGeneric() []URISignKey { + keys := make([]URISignKey, len(tbl.Records)) + + for i, record := range tbl.Records { + keys[i] = URISignKey{ + DeliveryService: record.DeliveryService, + Keys: record.Key, + } + } + + return keys +} +func (tbl *riakURISignKeyTable) fromGeneric(keys []URISignKey) { + tbl.Records = make([]riakURISignKeyRecord, len(keys)) + + for i, record := range keys { + tbl.Records[i] = riakURISignKeyRecord{ + Key: record.Keys, + DeliveryService: record.DeliveryService, + } + } +} +func (tbl *riakURISignKeyTable) insertKeys(cluster *riak.Cluster) error { + for _, record := range tbl.Records { + objBytes, err := json.Marshal(record.Key) + if err != nil { + return fmt.Errorf("RiakURISignKey insertKeys: unable to marshal key: %w", err) + } + + err = setObject(cluster, makeRiakObject(objBytes, record.DeliveryService), BUCKET_URI_SIG) + if err != nil { + return fmt.Errorf("RiakURISignKey insertKeys: %w", err) + } + } + return nil +} +func (tbl *riakURISignKeyTable) validate() []string { + errs := []string{} + for i, record := range tbl.Records { + if record.DeliveryService == "" { + errs = append(errs, fmt.Sprintf("URI Key #%v: Delivery Service is blank!", i)) + } + } + return errs +} + +// Riak functions +func makeRiakObject(data []byte, key string) *riak.Object { + return &riak.Object{ + ContentType: rfc.ApplicationJSON, + Charset: "utf-8", + ContentEncoding: "utf-8", + Key: key, + Value: data, + } +} +func getObjects(cluster *riak.Cluster, bucket string) ([]*riak.Object, error) { + objs := []*riak.Object{} + keys, err := listKeys(cluster, bucket) + if err != nil { + return nil, err + } + for _, key := range keys { + objects, err := getObject(cluster, bucket, key) + if err != nil { + return nil, err + } + if len(objects) > 1 { + return nil, fmt.Errorf("Unexpected number of objects %v, ignoring\n", len(objects)) Review comment: Good catch, used to be a log with a continue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
