mhoppa commented on a change in pull request #3744: Rewrote jobs endpoints to go
URL: https://github.com/apache/trafficcontrol/pull/3744#discussion_r329237209
 
 

 ##########
 File path: traffic_ops/traffic_ops_golang/invalidationjobs/invalidationjobs.go
 ##########
 @@ -0,0 +1,928 @@
+package invalidationjobs
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import "database/sql"
+import "encoding/json"
+import "errors"
+import "fmt"
+import "net/http"
+import "strconv"
+import "strings"
+import "time"
+
+import "github.com/apache/trafficcontrol/lib/go-tc"
+import "github.com/apache/trafficcontrol/lib/go-log"
+import "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
+import 
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/dbhelpers"
+import "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/tenant"
+
+type InvalidationJob struct {
+       api.APIInfoImpl `json:"-"`
+       tc.InvalidationJob
+}
+
+const readQuery = `
+SELECT job.id,
+       keyword,
+       parameters,
+       asset_url,
+       start_time,
+       u.username AS createdBy,
+       ds.xml_id AS dsId
+FROM job
+JOIN tm_user u ON job.job_user = u.id
+JOIN deliveryservice ds  ON job.job_deliveryservice = ds.id
+`
+
+const insertQuery = `
+INSERT INTO job (
+       agent,
+       asset_type,
+       asset_url,
+       entered_time,
+       job_deliveryservice,
+       job_user,
+       keyword,
+       parameters,
+       start_time,
+       status)
+VALUES (
+       1::bigint,
+       'file',
+       (
+               SELECT o.protocol::text || '://' || o.fqdn || rtrim(concat(':', 
o.port::text), ':')
+               FROM origin o
+               WHERE o.deliveryservice = $1
+               AND o.is_primary
+       ) || $2,
+       $3,
+       $4,
+       $5,
+       'PURGE',
+       $6,
+       $7,
+       1::bigint
+)
+RETURNING
+       asset_url,
+       (SELECT deliveryservice.xml_id
+        FROM deliveryservice
+        WHERE deliveryservice.id=job_deliveryservice) AS deliveryservice,
+       id,
+       (SELECT tm_user.username
+        FROM tm_user
+        WHERE tm_user.id=job_user) AS createdBy,
+       keyword,
+       parameters,
+       start_time
+`
+
+const revalQuery = `
+UPDATE server SET %s=TRUE
+WHERE server.status NOT IN (
+                             SELECT status.id
+                             FROM status
+                             WHERE name IN ('OFFLINE', 'PRE_PROD')
+                           )
+     AND server.profile IN (
+                             SELECT profile_parameter.profile
+                             FROM profile_parameter
+                             WHERE profile_parameter.parameter IN (
+                                                                    SELECT 
parameter.id
+                                                                    FROM 
parameter
+                                                                    WHERE 
parameter.name='location'
+                                                                     AND 
parameter.config_file='regex_revalidate.config'
+                                                                  )
+                           )
+     AND server.cdn_id  =  (
+                             SELECT deliveryservice.cdn_id
+                             FROM deliveryservice
+                             WHERE deliveryservice.%s=$1
+                           )
+`
+
+const updateQuery = `
+UPDATE job
+SET asset_url=$1,
+    keyword=$2,
+    parameters=$3,
+    start_time=$4
+WHERE job.id=$5
+RETURNING job.asset_url,
+          (
+           SELECT tm_user.username
+           FROM tm_user
+           WHERE tm_user.id=job.job_user
+          ) AS created_by,
+          (
+           SELECT deliveryservice.xml_id
+           FROM deliveryservice
+           WHERE deliveryservice.id=job.job_deliveryservice
+          ) AS delivery_service,
+          job.id,
+          job.keyword,
+          job.parameters,
+          job.start_time
+`
+
+const patchUpdateQuery = `
+UPDATE job
+SET asset_url=$1,
+    parameters=$2,
+    start_time=$3
+WHERE id=$4
+RETURNING job.asset_url,
+          (SELECT tm_user.username
+           FROM tm_user
+           WHERE tm_user.id=job.job_user) AS created_by,
+           (SELECT deliveryservice.xml_id
+            FROM deliveryservice
+            WHERE deliveryservice.id=job.job_deliveryservice) AS 
deliveryservice,
+           job.id,
+           job.keyword,
+           job.parameters,
+           job.start_time
+`
+
+// See the commented-out 'Patch' function for why this is commented out
+// const patchInfoQuery = `
+// SELECT job.id AS id,
+//        tm_user.username AS createdBy,
+//        job.job_user AS createdByID,
+//        job.job_deliveryservice AS dsid,
+//        deliveryservice.xml_id AS dsxmlid,
+//        job.asset_url AS assetURL,
+//        ltrim(rtrim(job.parameters, 'h'), 'TTL:')::bigint AS ttl,
+//        job.parameters,
+//        job.start_time AS start_time,
+//        origin.protocol || '://' || origin.fqdn || rtrim(concat(':', 
origin.port), ':') AS OFQDN
+// FROM job
+// INNER JOIN origin ON origin.deliveryservice=job.job_deliveryservice
+// INNER JOIN tm_user ON tm_user.id=job.job_user
+// INNER JOIN deliveryservice ON deliveryservice.id=job.job_deliveryservice
+// WHERE job.id=$1
+// `
+
+const putInfoQuery = `
+SELECT job.id AS id,
+       tm_user.username AS createdBy,
+       job.job_user AS createdByID,
+       job.job_deliveryservice AS dsid,
+       deliveryservice.xml_id AS dsxmlid,
+       job.asset_url AS assetURL,
+       job.parameters,
+       job.start_time AS start_time,
+       origin.protocol || '://' || origin.fqdn || rtrim(concat(':', 
origin.port), ':') AS OFQDN
+FROM job
+INNER JOIN origin ON origin.deliveryservice=job.job_deliveryservice
+INNER JOIN tm_user ON tm_user.id=job.job_user
+INNER JOIN deliveryservice ON deliveryservice.id=job.job_deliveryservice
+WHERE job.id=$1
+`
+
+const deleteQuery = `
+DELETE
+FROM job
+WHERE job.id=$1
+RETURNING job.asset_url,
+          (
+           SELECT tm_user.username
+           FROM tm_user
+           WHERE tm_user.id=job.job_user
+          ) AS created_by,
+          (
+           SELECT deliveryservice.xml_id
+           FROM deliveryservice
+           WHERE deliveryservice.id=job.job_deliveryservice
+          ) AS deliveryservice,
+          job.id,
+          job.keyword,
+          job.parameters,
+          job.start_time
+`
+
+type apiResponse struct {
+       Alerts   []tc.Alert         `json:"alerts,omitempty"`
+       Response tc.InvalidationJob `json:"response,omitempty"`
+}
+
+// Used by GET requests to `/jobs`, simply returns a filtered list of
+// content invalidation jobs according to the provided query parameters.
+func (job *InvalidationJob) Read() ([]interface{}, error, error, int) {
+       queryParamsToSQLCols := map[string]dbhelpers.WhereColumnInfo{
+               "id":              dbhelpers.WhereColumnInfo{"job.id", 
api.IsInt},
+               "keyword":         dbhelpers.WhereColumnInfo{"job.keyword", 
nil},
+               "assetUrl":        dbhelpers.WhereColumnInfo{"job.asset_url", 
nil},
+               "userId":          dbhelpers.WhereColumnInfo{"job.job_user", 
api.IsInt},
+               "createdBy":       dbhelpers.WhereColumnInfo{`(SELECT 
tm_user.username FROM tm_user WHERE tm_user.id=job.job_user)`, nil},
+               "deliveryService": dbhelpers.WhereColumnInfo{`(SELECT 
deliveryservice.xml_id FROM deliveryservice WHERE 
deliveryservice.id=job.job_deliveryservice)`, nil},
+               "dsId":            
dbhelpers.WhereColumnInfo{"job.job_deliveryservice", api.IsInt},
+       }
+
+       where, orderBy, queryValues, errs := 
dbhelpers.BuildWhereAndOrderBy(job.APIInfo().Params, queryParamsToSQLCols)
+       if len(errs) > 0 {
+               var b strings.Builder
+               b.WriteString("Reading jobs:")
+               for _, err := range errs {
+                       b.WriteString("\n\t")
+                       b.WriteString(err.Error())
+               }
+
+               return nil, nil, errors.New(b.String()), 
http.StatusInternalServerError
+       }
+
+       // TODO: check tenancy here
+       query := readQuery + where + orderBy
+
+       log.Debugln("generated job query: " + query)
+       log.Debugf("executing with values: %++v\n", queryValues)
+
+       returnable := []tc.InvalidationJob{}
+       rows, err := job.APIInfo().Tx.NamedQuery(query, queryValues)
+       if err != nil {
+               return nil, nil, fmt.Errorf("querying: %v", err), 
http.StatusInternalServerError
+       }
+       defer rows.Close()
+
+       for rows.Next() {
+               j := tc.InvalidationJob{}
+               err := rows.Scan(&j.ID,
+                       &j.Keyword,
+                       &j.Parameters,
+                       &j.AssetURL,
+                       &j.StartTime,
+                       &j.CreatedBy,
+                       &j.DeliveryService)
+               if err != nil {
+                       return nil, nil, fmt.Errorf("parsing db response: %v", 
err), http.StatusInternalServerError
+               }
+
+               returnable = append(returnable, j)
+       }
+
+       if err := rows.Err(); err != nil {
+               return nil, nil, fmt.Errorf("Parsing db responses: %v", err), 
http.StatusInternalServerError
+       }
+
+       // This cannot be done in the scanning loop, because pq will throw an 
error if you try to make
+       // another query before exhausting the rows returned by an earlier query
+       filtered := []interface{}{}
+       for _, r := range returnable {
+               userErr, sysErr, errCode := 
IsUserAuthorizedToModifyDSXMLID(job.APIInfo(), *r.DeliveryService)
+               if sysErr != nil {
+                       return nil, userErr, sysErr, errCode
+               } else if userErr == nil {
+                       filtered = append(filtered, r)
+               }
+       }
+
+       return filtered, nil, nil, http.StatusOK
+}
+
+// Used by POST requests to `/jobs`, creates a new content invalidation job
+// from the provided request body.
+func Create(w http.ResponseWriter, r *http.Request) {
+       inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
+       if userErr != nil || sysErr != nil {
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
+               return
+       }
+       defer inf.Close()
+
+       job := tc.InvalidationJobInput{}
+       cType := r.Header.Get(http.CanonicalHeaderKey("content-type"))
+       if cType == tc.ApplicationJson || cType == "" {
+               decoder := json.NewDecoder(r.Body)
+               if err := decoder.Decode(&job); err != nil {
+                       api.HandleErr(w, r, inf.Tx.Tx, http.StatusBadRequest, 
errors.New("Unable to parse Invalidation Job"), fmt.Errorf("parsing jobs/ POST: 
%v", err))
+                       return
+               }
+       } else {
+               w.Header().Set(http.CanonicalHeaderKey("accept"), 
strings.Join([]string{tc.ApplicationJson, "application/x-www-form-urlencoded", 
"multipart/form-data"}, ";"))
+               err := fmt.Errorf("unsupported content-type: %s", cType)
+               api.HandleErr(w, r, inf.Tx.Tx, http.StatusUnsupportedMediaType, 
err, err)
+               return
+       }
+
+       w.Header().Set(http.CanonicalHeaderKey("content-type"), 
tc.ApplicationJson)
+       if errs := job.Validate(inf.Tx.Tx); len(errs) > 0 {
+               response := []tc.Alert{}
+               for _, e := range errs {
+                       response = append(response, tc.Alert{e, 
tc.ErrorLevel.String()})
+               }
+
+               resp, err := json.Marshal(struct {
+                       Alerts []tc.Alert `json:"alerts"`
+               }{response})
+               if err != nil {
+                       api.HandleErr(w, r, inf.Tx.Tx, 
http.StatusInternalServerError, nil, fmt.Errorf("Encoding bad request response: 
%v", err))
+               } else {
+                       w.WriteHeader(http.StatusBadRequest)
+                       w.Write(resp)
+                       w.Write([]byte{'\n'})
+               }
+               return
+       }
+
+       // Validate() would have already checked for deliveryservice existence 
and
+       // parsed the ttl, so if either of these throws an error now, something
+       // weird has happened
+       dsid, err := job.DSID(nil)
+       if err != nil {
+               sysErr = fmt.Errorf("retrieving parsed DSID: %v", err)
+               errCode = http.StatusInternalServerError
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, nil, sysErr)
+               return
+       }
+       var ttl uint
+       if ttl, err = job.TTLHours(); err != nil {
+               sysErr = fmt.Errorf("retrieving parsed TTL: %v", err)
+               errCode = http.StatusInternalServerError
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, nil, sysErr)
+               return
+       }
+
+       if userErr, sysErr, errCode = IsUserAuthorizedToModifyDSID(inf, dsid); 
userErr != nil || sysErr != nil {
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
+               return
+       }
+
+       row := inf.Tx.Tx.QueryRow(insertQuery,
+               dsid,
+               *job.Regex,
+               time.Now(),
+               dsid,
+               inf.User.ID,
+               fmt.Sprintf("TTL:%dh", ttl),
+               (*job.StartTime).Time)
+
+       result := tc.InvalidationJob{}
+       err = row.Scan(&result.AssetURL,
+               &result.DeliveryService,
+               &result.ID,
+               &result.CreatedBy,
+               &result.Keyword,
+               &result.Parameters,
+               &result.StartTime)
+       if err != nil {
+               userErr, sysErr, errCode = api.ParseDBError(err)
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
+               return
+       }
+
+       if err := setRevalFlags(dsid, inf.Tx.Tx); err != nil {
+               api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, 
nil, fmt.Errorf("setting reval flags: %v", err))
+       } else {
+               resp, err := json.Marshal(apiResponse{[]tc.Alert{{"Invalidation 
Job creation was successful", tc.SuccessLevel.String()}}, result})
+               if err != nil {
+                       api.HandleErr(w, r, inf.Tx.Tx, 
http.StatusInternalServerError, nil, fmt.Errorf("Marshaling JSON: %v", err))
+               } else {
+                       w.Header().Set(http.CanonicalHeaderKey("location"), 
inf.Config.URL.Scheme+"://"+r.Host+"/api/1.4/jobs?id="+strconv.FormatUint(uint64(*result.ID),
 10))
+                       w.WriteHeader(http.StatusCreated)
+                       w.Write(resp)
+                       w.Write([]byte{'\n'})
+               }
+       }
+
+       api.CreateChangeLogRawTx(api.ApiChange, api.Created+"content 
invalidation job: #"+strconv.FormatUint(*result.ID, 10), inf.User, inf.Tx.Tx)
+}
+
+// Used by PUT requests to `/jobs`, replaces an existing content invalidation 
job
+// with the provided request body.
+func Update(w http.ResponseWriter, r *http.Request) {
+       inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
+       if userErr != nil || sysErr != nil {
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
+               return
+       }
+       defer inf.Close()
+
+       var oFQDN string
+       var dsid uint
+       var uid uint
+       job := tc.InvalidationJob{}
+       row := inf.Tx.Tx.QueryRow(putInfoQuery, inf.Params["id"])
+       err := row.Scan(&job.ID,
+               &job.CreatedBy,
+               &uid,
+               &dsid,
+               &job.DeliveryService,
+               &job.AssetURL,
+               &job.Parameters,
+               &job.StartTime,
+               &oFQDN)
+       if err != nil {
+               if err == sql.ErrNoRows {
+                       userErr = fmt.Errorf("No job found by id %s", 
inf.Params["id"])
+                       errCode = http.StatusNotFound
+               } else {
+                       sysErr = fmt.Errorf("fetching job update info: %v", err)
+                       errCode = http.StatusInternalServerError
+               }
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
+               return
+       }
+
+       if userErr, sysErr, errCode = IsUserAuthorizedToModifyDSID(inf, dsid); 
userErr != nil || sysErr != nil {
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
+               return
+       }
+
+       if userErr, sysErr, errCode = 
IsUserAuthorizedToModifyJobsMadeByUsername(inf, *job.CreatedBy); userErr != nil 
|| sysErr != nil {
+               api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
+               return
+       }
+
+       input := tc.InvalidationJob{}
+       cType := r.Header.Get(http.CanonicalHeaderKey("content-type"))
+       if cType == tc.ApplicationJson || cType == "" {
+               decoder := json.NewDecoder(r.Body)
+               if err := decoder.Decode(&input); err != nil {
+                       userErr = fmt.Errorf("Unable to parse input: %v", err)
+                       sysErr = fmt.Errorf("parsing input to PUT jobs?id=%s: 
%v", inf.Params["id"], err)
+                       errCode = http.StatusBadRequest
+                       api.HandleErr(w, r, inf.Tx.Tx, errCode, userErr, sysErr)
+                       return
+               }
+       } else {
+               w.Header().Set(http.CanonicalHeaderKey("accept"), 
strings.Join([]string{tc.ApplicationJson, "application/x-www-form-urlencoded", 
"multipart/form-data"}, ";"))
 
 Review comment:
   same question here as above 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to