http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/cluster.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc deleted file mode 100644 index a9928eb..0000000 --- a/src/utils/cluster.cc +++ /dev/null @@ -1,131 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/utils/cluster.h" - -#include <sys/stat.h> -#include <sys/types.h> -#include <unistd.h> -#include <fstream> - -namespace singa { -using std::vector; - -Cluster* Cluster::Setup(int job, const SingaProto& singaConf, - const ClusterProto& clusterConf) { - Singleton<Cluster>::Instance()->Init(job, singaConf, clusterConf); - return Singleton<Cluster>::Instance(); -} - -Cluster* Cluster::Get() { - if (!Singleton<Cluster>::Instance()->nprocs_) { - LOG(ERROR) << "The first call to Get should " - << "provide the job conf path"; - } - return Singleton<Cluster>::Instance(); -} - -void Cluster::Register(int pid, const std::string& endpoint) { - procs_id_ = cluster_rt_->RegistProc(endpoint, pid); - CHECK_GE(procs_id_, 0); - CHECK_LT(procs_id_, nprocs()); - LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint - << " (pid = " << pid << ")"; -} - -void Cluster::Init(int job, const SingaProto& singaConf, - const ClusterProto& clusterConf) { - cluster_ = clusterConf; - singa_ = singaConf; - SetupFolders(clusterConf); - if (server_worker_separate()) - nprocs_ = nworker_procs() + nserver_procs(); - else - nprocs_ = std::max(nworker_procs(), nserver_procs()); - - // locate the process id of every worker/server - int ngrps = cluster_.nworker_groups(); - int grp_size = cluster_.nworkers_per_group(); - int procs = 0; - for (int i = 0; i < ngrps; ++i) { - for (int j = 0; j < grp_size; ++j) { - procs = (i * grp_size + j) / cluster_.nworkers_per_procs(); - procs_ids_[Hash(i, j, kWorkerLayer)] = procs; - procs_ids_[Hash(i, j, kWorkerParam)] = procs; - } - } - int offset = cluster_.server_worker_separate() ? procs + 1 : 0; - ngrps = cluster_.nserver_groups(); - grp_size = cluster_.nservers_per_group(); - for (int i = 0; i < ngrps; ++i) { - for (int j = 0; j < grp_size; ++j) { - procs_ids_[Hash(i, j, kServer)] = - (i * grp_size + j) / cluster_.nservers_per_procs() + offset; - } - } - // cluster_rt_ = new ZKClusterRT(singa_.zookeeper_host(), job); - // cluster_rt_ = new SPClusterRT(); - cluster_rt_ = ClusterRuntime::Create(singa_.zookeeper_host(), job); - cluster_rt_->Init(); -} - -void Cluster::SetupFolders(const ClusterProto &cluster) { - // create visulization folder - mkdir(vis_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - // create checkpoint folder - mkdir(checkpoint_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); -} - -const vector<int> Cluster::ExecutorRng(int pid, int grp_size, int procs_size) { - int gstart, gend, start, end; - if (grp_size >= procs_size) { - // all workers in this procs are from the same group - gstart = pid * procs_size / grp_size; - gend = gstart + 1; - start = pid * procs_size % grp_size; - end = start + procs_size; - } else { - // there are multiple (complete) groups in this procs. - CHECK_EQ(procs_size % grp_size, 0); - int groups_per_procs = procs_size / grp_size; - gstart = pid * groups_per_procs; - gend = (pid+1) * groups_per_procs; - start = 0; - end = grp_size; - } - return vector<int>{gstart, gend, start, end}; -} - -int Cluster::Hash(int gid, int id, int flag) { - int ret = -1; - if (flag == kServer) { - ret = kServer * cluster_.nworker_groups() - * cluster_.nworkers_per_group() - + (cluster_.nserver_groups() + gid) - * cluster_.nservers_per_group() + id; - } else { - ret = (flag * cluster_.nworker_groups() + gid) - * cluster_.nworkers_per_group() + id; - } - return ret; -} - -} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/cluster_rt.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc deleted file mode 100644 index 9a7b8bd..0000000 --- a/src/utils/cluster_rt.cc +++ /dev/null @@ -1,110 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/utils/cluster_rt.h" - -#include <glog/logging.h> -#include <google/protobuf/text_format.h> -#include <stdlib.h> -#include <algorithm> -#include <fstream> -#include <iostream> -#include "singa/proto/job.pb.h" - -#ifdef USE_ZOOKEEPER -#include "singa/utils/zk_service.h" -#endif - -using std::string; -using std::to_string; -using std::vector; - -namespace singa { - -ClusterRuntime* ClusterRuntime::Create(const std::string&host, int job_id) { -#ifdef USE_ZOOKEEPER - return new ZKClusterRT(host, job_id); -#else - return new SPClusterRT(); -#endif -} - -SPClusterRT::~SPClusterRT() { - // release callback vector - for (auto list : grp_callbacks_) - for (RTCallback* p : list.second) { - delete p; - } -} - -bool SPClusterRT::Init() { - return true; -} - -int SPClusterRT::RegistProc(const string& host_addr, int pid) { - int ret; - lock_.lock(); - proc_list_.push_back(host_addr + std::to_string(pid)); - ret = proc_list_.size()-1; - lock_.unlock(); - return ret; -} - -string SPClusterRT::GetProcHost(int proc_id) { - if (proc_list_.size() < (unsigned)proc_id + 1) return ""; - return proc_list_[proc_id]; -} - -bool SPClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) { - // store the callback function and context for later usage - RTCallback *cb = new RTCallback; - cb->fn = fn; - cb->ctx = ctx; - lock_.lock(); - if (grp_callbacks_.count(gid) == 0) - grp_callbacks_[gid] = vector<RTCallback*>{}; - grp_callbacks_[gid].push_back(cb); - lock_.unlock(); - return true; -} - -bool SPClusterRT::JoinSGroup(int gid, int wid, int s_group) { - lock_.lock(); - if (grp_count_.count(gid) == 0) - grp_count_[gid] = 0; - grp_count_[gid]++; - lock_.unlock(); - return true; -} - -bool SPClusterRT::LeaveSGroup(int gid, int wid, int s_group) { - lock_.lock(); - if (--grp_count_[gid] == 0) { - for (RTCallback* cb : grp_callbacks_[gid]) { - (*cb->fn)(cb->ctx); - cb->fn = nullptr; - } - } - lock_.unlock(); - return true; -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/common.cc ---------------------------------------------------------------------- diff --git a/src/utils/common.cc b/src/utils/common.cc index bd0fee5..d1a7d2c 100644 --- a/src/utils/common.cc +++ b/src/utils/common.cc @@ -1,574 +1,27 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - /** - * The code is adapted from Caffe under BSD 2 Clause license. - * All contributions by the University of California: - * Copyright (c) 2014, The Regents of the University of California (Regents) - * All rights reserved. - * All other contributions: - * Copyright (c) 2014, the respective contributors - * All rights reserved. - * Caffe uses a shared copyright model: each contributor holds copyright over - * their contributions to Caffe. The project versioning records all such - * contribution and copyright details. If a contributor wants to further mark - * their specific copyright on a particular contribution, they should indicate - * their copyright solely in the commit message of the change when it is - * committed. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -#include "singa/utils/common.h" - -#include <sys/ioctl.h> -#include <sys/socket.h> -#include <sys/stat.h> -#include <sys/types.h> - -#include <netinet/in.h> -#include <net/if.h> -#include <arpa/inet.h> - -#include <stdarg.h> -#include <stdio.h> -#include <time.h> -#include <unistd.h> -#include <fcntl.h> -#include <cfloat> - -#include <fstream> - -#include <glog/logging.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl.h> -#include <google/protobuf/text_format.h> namespace singa { -const int kBufLen = 1024; - -string IntVecToString(const vector<int>& vec) { - string disp = "("; - for (int x : vec) - disp += std::to_string(x) + ", "; - return disp + ")"; -} - -/** - * * Formatted string. - * */ -string VStringPrintf(string fmt, va_list l) { - char buffer[4096]; - vsnprintf(buffer, sizeof(buffer), fmt.c_str(), l); - return string(buffer); -} - -/** - * * Formatted string. - * */ -string StringPrintf(string fmt, ...) { - va_list l; - va_start(l, fmt); // fmt.AsString().c_str()); - string result = VStringPrintf(fmt, l); - va_end(l); - return result; -} - -int ArgPos(int argc, char** arglist, const char* arg) { - for (int i = 0; i < argc; i++) { - if (strcmp(arglist[i], arg) == 0) { - return i; - } - } - return -1; -} - -void CreateFolder(const string name) { - struct stat buffer; - if (stat(name.c_str(), &buffer) != 0) { - mkdir(name.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - CHECK_EQ(stat(name.c_str(), &buffer), 0); - } -} - -const vector<vector<int>> Slice(int num, const vector<int>& sizes) { - vector<vector<int>> slices; - if (num == 0) - return slices; - int avg = 0; - for (int x : sizes) - avg += x; - avg = avg / num + avg % num; - int diff = avg / 10; - // DLOG(INFO) << "Slicer, param avg = " << avg << ", diff = " << diff; - - int capacity = avg, nbox = 0; - for (int x : sizes) { - vector<int> slice; - string slicestr = ""; - while (x > 0) { - int size = 0; - if (capacity >= x) { - capacity -= x; - size = x; - x = 0; - } else if (capacity + diff >= x) { - size = x; - x = 0; - capacity = 0; - } else if (capacity >= diff) { - x -= capacity; - size = capacity; - capacity = avg; - nbox++; - } else { - capacity = avg; - nbox++; - } - if (size) { - slice.push_back(size); - slicestr += ", " + std::to_string(size); - } - } - // DLOG(INFO) << slicestr; - slices.push_back(slice); - } - CHECK_LE(nbox, num); - return slices; -} - -const vector<int> PartitionSlices(int num, const vector<int>& slices) { - vector<int> slice2box; - if (num == 0) - return slice2box; - int avg = 0; - for (int x : slices) - avg += x; - avg = avg / num + avg % num; - int box = avg, boxid = 0, diff = avg / 10; - for (auto it = slices.begin(); it != slices.end();) { - int x = *it; - if (box >= x) { - box -= x; - slice2box.push_back(boxid); - it++; - } else if (box + diff >= x) { - slice2box.push_back(boxid); - it++; - box = 0; - } else { - box = avg; - boxid++; - } - } - CHECK_EQ(slice2box.size(), slices.size()); - int previd = -1; - string disp; - for (size_t i = 0; i < slice2box.size(); i++) { - if (previd != slice2box[i]) { - previd = slice2box[i]; - disp += " box = " +std::to_string(previd) + ":"; - } - disp += " " + std::to_string(slices[i]); - } - return slice2box; -} - -int gcd(int a, int b) { - for (;;) { - if (a == 0) return b; - b %= a; - if (b == 0) return a; - a %= b; - } -} - -int LeastCommonMultiple(int a, int b) { - int temp = gcd(a, b); - return temp ? (a / temp * b) : 0; -} - -string GetHostIP() { - int fd; - struct ifreq ifr; - fd = socket(AF_INET, SOCK_DGRAM, 0); - /* I want to get an IPv4 IP address */ - ifr.ifr_addr.sa_family = AF_INET; - /* I want IP address attached to "eth0" */ - strncpy(ifr.ifr_name, "eth0", IFNAMSIZ-1); - ioctl(fd, SIOCGIFADDR, &ifr); - close(fd); - string ip(inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr)); - /* display result */ - LOG(INFO) << "Host IP = " << ip; - return ip; -} - -void SetupLog(const string& log_dir, const string& model) { - // TODO(wangwei) check if NFS, then create folder using script, otherwise - // may have problems due to multiple processes create the same folder. - CreateFolder(log_dir); - string warn = log_dir + "/" + model + "-warn-"; - string info = log_dir + "/" + model + "-info-"; - string error = log_dir + "/" + model + "-error-"; - string fatal = log_dir + "/" + model + "-fatal-"; - google::SetLogDestination(google::WARNING, warn.c_str()); - google::SetLogDestination(google::INFO, info.c_str()); - google::SetLogDestination(google::ERROR, error.c_str()); - google::SetLogDestination(google::FATAL, fatal.c_str()); -} - -Metric::Metric(const string& str) { - ParseFrom(str); -} - -void Metric::Add(const string& name, float value) { - Add(name, value, 1); -} -void Metric::Add(const string& name, float value, int count) { - if (entry_.find(name) == entry_.end()) { - entry_[name] = std::make_pair(1, value); - } else { - auto& e = entry_.at(name); - e.first += count; - e.second += value; - } -} - -void Metric::Reset() { - for (auto& e : entry_) { - e.second.first = 0; - e.second.second = 0; - } -} - -string Metric::ToLogString() const { - string ret; - size_t k = 0; - for (auto e : entry_) { - ret += e.first + " = "; - ret += std::to_string(e.second.second / e.second.first); - if (++k < entry_.size()) - ret += ", "; - } - return ret; -} - -string Metric::ToString() const { - MetricProto proto; - for (auto e : entry_) { - proto.add_name(e.first); - proto.add_count(e.second.first); - proto.add_val(e.second.second); - } - string ret; - proto.SerializeToString(&ret); - return ret; -} - -void Metric::ParseFrom(const string& msg) { - MetricProto proto; - proto.ParseFromString(msg); - Reset(); - for (int i = 0; i < proto.name_size(); i++) { - entry_[proto.name(i)] = std::make_pair(proto.count(i), proto.val(i)); - } -} - - -/*************Below functions are adapted from Caffe ************/ -using google::protobuf::io::CodedInputStream; -using google::protobuf::io::FileInputStream; -using google::protobuf::io::FileOutputStream; -using google::protobuf::io::ZeroCopyInputStream; - - -void Im2col(const float* data_im, const int channels, - const int height, const int width, const int kernel_h, const int kernel_w, - const int pad_h, const int pad_w, const int stride_h, const int stride_w, - float* data_col) { - int height_col = (height + 2 * pad_h - kernel_h) / stride_h + 1; - int width_col = (width + 2 * pad_w - kernel_w) / stride_w + 1; - int channels_col = channels * kernel_h * kernel_w; - for (int c = 0; c < channels_col; ++c) { - int w_offset = c % kernel_w; - int h_offset = (c / kernel_w) % kernel_h; - int c_im = c / kernel_h / kernel_w; - for (int h = 0; h < height_col; ++h) { - for (int w = 0; w < width_col; ++w) { - int h_pad = h * stride_h - pad_h + h_offset; - int w_pad = w * stride_w - pad_w + w_offset; - if (h_pad >= 0 && h_pad < height && w_pad >= 0 && w_pad < width) - data_col[(c * height_col + h) * width_col + w] = - data_im[(c_im * height + h_pad) * width + w_pad]; - else - data_col[(c * height_col + h) * width_col + w] = 0; - } - } - } -} - -void Col2im(const float* data_col, const int channels, - const int height, const int width, const int patch_h, const int patch_w, - const int pad_h, const int pad_w, const int stride_h, const int stride_w, - float* data_im) { - memset(data_im, 0, height * width * channels * sizeof(float)); - int height_col = (height + 2 * pad_h - patch_h) / stride_h + 1; - int width_col = (width + 2 * pad_w - patch_w) / stride_w + 1; - int channels_col = channels * patch_h * patch_w; - for (int c = 0; c < channels_col; ++c) { - int w_offset = c % patch_w; - int h_offset = (c / patch_w) % patch_h; - int c_im = c / patch_h / patch_w; - for (int h = 0; h < height_col; ++h) { - for (int w = 0; w < width_col; ++w) { - int h_pad = h * stride_h - pad_h + h_offset; - int w_pad = w * stride_w - pad_w + w_offset; - if (h_pad >= 0 && h_pad < height && w_pad >= 0 && w_pad < width) - data_im[(c_im * height + h_pad) * width + w_pad] += - data_col[(c * height_col + h) * width_col + w]; - } - } - } -} - -void ForwardMaxPooling(const float* bottom, const int num, const int channels, - const int height, const int width, const int kernel_h, const int kernel_w, - const int pad_h, const int pad_w, const int stride_h, const int stride_w, - float* top, float* mask) { - int top_height = (height + pad_h * 2 -kernel_h) / stride_h + 1; - int top_width = (width + pad_w * 2 -kernel_w) / stride_w + 1; - int top_count = num * top_height * top_width * channels; - for (int i = 0; i < top_count; i++) { - mask[i] = -1; - top[i] = -FLT_MAX; - } - const int bottom_offset = height * width; - const int top_offset = top_height * top_width; - // The main loop - for (int n = 0; n < num; ++n) { - for (int c = 0; c < channels; ++c) { - for (int ph = 0; ph < top_height; ++ph) { - for (int pw = 0; pw < top_width; ++pw) { - int hstart = ph * stride_h - pad_h; - int wstart = pw * stride_w - pad_w; - int hend = std::min(hstart + kernel_h, height); - int wend = std::min(wstart + kernel_w, width); - hstart = std::max(hstart, 0); - wstart = std::max(wstart, 0); - const int top_index = ph * top_width + pw; - for (int h = hstart; h < hend; ++h) { - for (int w = wstart; w < wend; ++w) { - const int index = h * width + w; - if (bottom[index] > top[top_index]) { - top[top_index] = bottom[index]; - mask[top_index] = index; - } - } - } - } - } - // compute offset - bottom += bottom_offset; - top += top_offset; - mask += top_offset; - } - } -} - -void BackwardMaxPooling(const float* top, const float* mask, const int num, - const int channels, const int height, const int width, - const int kernel_h, const int kernel_w, const int pad_h, const int pad_w, - const int stride_h, const int stride_w, - float* bottom) { - int top_height = (height + pad_h * 2 -kernel_h) / stride_h + 1; - int top_width = (width + pad_w * 2 -kernel_w) / stride_w + 1; - const int top_offset = top_height * top_width; - const int bottom_offset = height * width; - memset(bottom, 0, sizeof(float) * num * channels * bottom_offset); - for (int n = 0; n < num; ++n) { - for (int c = 0; c < channels; ++c) { - for (int ph = 0; ph < top_height; ++ph) { - for (int pw = 0; pw < top_width; ++pw) { - const int top_idx = ph * top_width + pw; - const int bottom_idx = static_cast<int>(mask[top_idx]); - bottom[bottom_idx] += top[top_idx]; - } - } - top += top_offset; - mask += top_offset; - bottom += bottom_offset; - } - } -} - -void ForwardAvgPooling(const float* bottom, const int num, const int channels, - const int height, const int width, const int kernel_h, const int kernel_w, - const int pad_h, const int pad_w, const int stride_h, const int stride_w, - float* top) { - int top_height = (height + pad_h * 2 -kernel_h) / stride_h + 1; - int top_width = (width + pad_w * 2 -kernel_w) / stride_w + 1; - int top_count = num * top_height * top_width * channels; - for (int i = 0; i < top_count; i++) { - top[i] = 0; - } - const int bottom_offset = height * width; - const int top_offset = top_height * top_width; - // The main loop - for (int n = 0; n < num; ++n) { - for (int c = 0; c < channels; ++c) { - for (int ph = 0; ph < top_height; ++ph) { - for (int pw = 0; pw < top_width; ++pw) { - int hstart = ph * stride_h - pad_h; - int wstart = pw * stride_w - pad_w; - int hend = std::min(hstart + kernel_h, height+pad_h); - int wend = std::min(wstart + kernel_w, width+pad_w); - int pool_size = (hend-hstart) * (wend-wstart); - hstart = std::max(hstart, 0); - wstart = std::max(wstart, 0); - hend = std::min(hend, height); - wend = std::min(wend, width); - const int top_index = ph * top_width + pw; - for (int h = hstart; h < hend; ++h) { - for (int w = wstart; w < wend; ++w) { - const int index = h * width + w; - top[top_index] += bottom[index]; - } - } - top[top_index] /= pool_size; - } - } - // compute offset - bottom += bottom_offset; - top += top_offset; - } - } -} - -void BackwardAvgPooling(const float* top, const int num, const int channels, - const int height, const int width, const int kernel_h, const int kernel_w, - const int pad_h, const int pad_w, const int stride_h, const int stride_w, - float* bottom) { - int top_height = (height + pad_h * 2 -kernel_h) / stride_h + 1; - int top_width = (width + pad_w * 2 -kernel_w) / stride_w + 1; - const int top_offset = top_height * top_width; - const int bottom_offset = height * width; - memset(bottom, 0, sizeof(float) * num * channels * bottom_offset); - for (int n = 0; n < num; ++n) { - for (int c = 0; c < channels; ++c) { - for (int ph = 0; ph < top_height; ++ph) { - for (int pw = 0; pw < top_width; ++pw) { - int hstart = ph * stride_h - pad_h; - int wstart = pw * stride_w - pad_w; - int hend = std::min(hstart + kernel_h, height+pad_h); - int wend = std::min(wstart + kernel_w, width+pad_w); - int pool_size = (hend-hstart) * (wend-wstart); - hstart = std::max(hstart, 0); - wstart = std::max(wstart, 0); - hend = std::min(hend, height); - wend = std::min(wend, width); - const int top_index = ph * top_width + pw; - for (int h = hstart; h < hend; ++h) { - for (int w = wstart; w < wend; ++w) { - const int index = h * width + w; - bottom[index] += top[top_index] / pool_size; - } - } - } - } - top += top_offset; - bottom += bottom_offset; - } - } -} - -void ReadProtoFromTextFile(const char* filename, Message* proto) { - int fd = open(filename, O_RDONLY); - CHECK_NE(fd, -1) << "File not found: " << filename; - FileInputStream* input = new FileInputStream(fd); - CHECK(google::protobuf::TextFormat::Parse(input, proto)); - delete input; - close(fd); -} - -void WriteProtoToTextFile(const Message& proto, const char* filename) { - int fd = open(filename, O_WRONLY | O_CREAT, 0644); - FileOutputStream* output = new FileOutputStream(fd); - CHECK(google::protobuf::TextFormat::Print(proto, output)); - delete output; - close(fd); -} - -void ReadProtoFromBinaryFile(const char* filename, Message* proto) { - int fd = open(filename, O_RDONLY); - CHECK_NE(fd, -1) << "File not found: " << filename; - ZeroCopyInputStream* raw_input = new FileInputStream(fd); - CodedInputStream* coded_input = new CodedInputStream(raw_input); - // upper limit 512MB, warning threshold 256MB - coded_input->SetTotalBytesLimit(536870912, 268435456); - CHECK(proto->ParseFromCodedStream(coded_input)); - delete coded_input; - delete raw_input; - close(fd); -} - -void WriteProtoToBinaryFile(const Message& proto, const char* filename) { - int fd = open(filename, O_CREAT|O_WRONLY|O_TRUNC, 0644); - CHECK_NE(fd, -1) << "File cannot open: " << filename; - CHECK(proto.SerializeToFileDescriptor(fd)); -} -void WriteStringToTextFile(const string& filename, const string& context) { - std::ofstream ofs; - ofs.open(filename); - CHECK(ofs.is_open()) << "Can't write to file: " << filename; - ofs << context; - ofs.flush(); - ofs.close(); -} -const vector<std::pair<string, float>> GetMetricFromString(const string& disp) { - size_t pos = 0; - vector<string> terms; - while (pos != string::npos) { - auto next = disp.find_first_of(" ,", pos); // delimiter: space or comma - if (next != string::npos) { - terms.push_back(disp.substr(pos, next - pos)); - pos = disp.find_first_not_of(" ,", next + 1); - } else { - break; - } - } - if (pos != string::npos) - terms.push_back(disp.substr(pos)); - vector<std::pair<string, float>> ret; - for (unsigned i = 0; i < terms.size(); i++) { - if (terms[i] == "=") { - CHECK_GE(i, 1); - CHECK_LT(i, terms.size() - 1) << "terms[i] = " << terms[i]; - ret.push_back(std::make_pair(terms[i-1], std::stof(terms[i + 1]))); - } - } - return ret; -} -} // namespace singa +} /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/graph.cc ---------------------------------------------------------------------- diff --git a/src/utils/graph.cc b/src/utils/graph.cc deleted file mode 100644 index 4f59635..0000000 --- a/src/utils/graph.cc +++ /dev/null @@ -1,273 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/utils/graph.h" - -#include <glog/logging.h> -#include <algorithm> -#include <queue> -#include <unordered_set> -#include "singa/utils/common.h" - -namespace singa { - -using std::map; -using std::string; -using std::vector; - -/************************************************************************** - * Implementation for Node class - *************************************************************************/ -Node::Node(string name) { - this->name = name; -} -Node::Node(string name, const std::map<string, string>& attrs) { - this->name = name; - this->attrs = attrs; -} - -Node::Node(const string& name, const string& origin, int id, void* proto) { - this->name = name; - this->origin = origin; - this->proto = proto; - this->partition_id = id; -} - -void Node::AddDstNode(Node* dstnode) { - dstnodes.push_back(dstnode); -} - -void Node::AddSrcNode(Node* srcnode) { - srcnodes.push_back(srcnode); -} - -void Node::RemoveDstNode(Node* dst) { - auto iter = dstnodes.begin(); - while ((*iter)->name != dst->name && iter != dstnodes.end()) - iter++; - CHECK_STREQ((*iter)->name.c_str(), dst->name.c_str()); - dstnodes.erase(iter); -} - -void Node::RemoveSrcNode(Node* src) { - auto iter = srcnodes.begin(); - while ((*iter)->name != src->name && iter != srcnodes.end()) - iter++; - CHECK_STREQ((*iter)->name.c_str(), src->name.c_str()); - srcnodes.erase(iter); -} - -/**************************************************************************** - * Implementation for Graph class - ****************************************************************************/ - -Graph::~Graph() { - for (Node* node : nodes_) - delete node; -} - -Node* Graph::AddNode(const string& name, const string& origin, int id, - void* proto) { - Node* node = new Node(name, origin, id, proto); - nodes_.push_back(node); - CHECK(name2node_.find(node->name) == name2node_.end()) - << "node " << node->name << " already exists"; - name2node_[node->name] = node; - return node; -} - -Node* Graph::AddNode(const string& name, - const std::map<string, string>& attrs) { - Node* node = new Node(name, attrs); - nodes_.push_back(node); - CHECK(name2node_.find(node->name) == name2node_.end()) - << "node " << node->name << " already exists"; - name2node_[node->name] = node; - return node; -} - -void Graph::AddEdge(Node* srcnode, Node* dstnode) { - srcnode->AddDstNode(dstnode); - dstnode->AddSrcNode(srcnode); -} - -void Graph::AddEdge(const string& src, const string& dst) { - auto srcnode = name2node_.find(src); - CHECK(srcnode != name2node_.end()) << "can't find src node " << src; - auto dstnode = name2node_.find(dst); - CHECK(dstnode != name2node_.end()) << "can't find dst node " << dst; - AddEdge(srcnode->second, dstnode->second); -} -void Graph::AddEdge(Node* srcnode, Node* dstnode, - const std::map<string, string>& attrs) { - AddEdge(srcnode, dstnode); - edge_attrs_[GetEdgeName(srcnode->name, dstnode->name)] = attrs; -} -void Graph::AddEdge(const string& src, const std::string& dst, - const std::map<string, string>& attrs) { - AddEdge(src, dst); - edge_attrs_[GetEdgeName(src, dst)] = attrs; -} - -void Graph::RemoveEdge(Node* src, Node* dst) { - src->RemoveDstNode(dst); - dst->RemoveSrcNode(src); -} - -void Graph::RemoveEdge(const string &src, const string& dst) { - auto srcnode = name2node_.find(src); - CHECK(srcnode != name2node_.end()) << "can't find src node " << src; - auto dstnode = name2node_.find(dst); - CHECK(dstnode != name2node_.end()) << "can't find dst node " << dst; - RemoveEdge(srcnode->second, dstnode->second); -} - -// sort to make `bottom' nodes be placed in the front positions -void Graph::Sort() { - // nodes to be visited - std::queue<Node*> visiting_nodes; - // visited node set - std::unordered_set<Node*> visited_set; - // visiting_nodes + visted_set - std::unordered_set<Node*> visit_set;; - for (auto node : nodes_) { - // visit nodes without source nodes firstly - if (node->srcnodes.size() == 0) { - visiting_nodes.push(node); - visit_set.insert(node); - } - } - int n = nodes_.size(); - nodes_.clear(); - while (!visiting_nodes.empty()) { - auto node = visiting_nodes.front(); - visiting_nodes.pop(); - bool visit = true; - bool bi_direction = false; - // check if a node has a bi-direction edge with its neighbour - for (auto src : node->srcnodes) - for (auto src_of_src : src->srcnodes) - if (strcmp((src_of_src->name).c_str(), (node->name).c_str()) == 0) { - bi_direction = true; - break; - } - // check whether its src nodes number greater than 1 - if (bi_direction && (node->srcnodes).size() > 1) { - auto src = node->srcnodes.at(0); - if (visited_set.find(src) == visited_set.end()) { - visit = false; - } - } else { - for (auto src : node->srcnodes) - if (visited_set.find(src) == visited_set.end()) { - visit = false; - break; - } - } - if (visit) { - nodes_.push_back(node); - visited_set.insert(node); - for (auto dst : node->dstnodes) { - // queueing the dst node if it is not queued before - if (visit_set.find(dst) == visit_set.end()) { - visiting_nodes.push(dst); - visit_set.insert(dst); - } - } - } else { - visiting_nodes.push(node); - } - } - CHECK_EQ(nodes_.size(), n); -} - -const Graph Graph::Reverse() const { - Graph g; - for (Node* n : nodes_) - g.AddNode(n->name, n->attrs); - for (Node* src : nodes_) - for (Node* dst : src->dstnodes) { - map<string, string> attrs; - const string edge = GetEdgeName(src->name, dst->name); - if (edge_attrs_.find(edge) != edge_attrs_.end()) - attrs = edge_attrs_.at(edge); - g.AddEdge(dst->name, src->name, attrs); - } - return g; -} -string Graph::ToJson() const { - map<string, string> label; - return ToJson(label); -} - - -string Graph::ToJson(const map<string, string>& label) const { - string disp = "{\"directed\":1,\n"; - - // add nodes - disp += "\"nodes\":[\n"; - - bool first = true; - map<string, int> node_id; - int id = 0; - for (auto node : nodes_) { - string name = node->name; - string lbl = name + " -- "; - if (label.find(name) != label.end()) - lbl += label.at(name); - if (node->attrs.find("label") != node->attrs.end()) - lbl += node->attrs.at("label"); - disp += StringPrintf("%c{\"id\":\"%s\", \"label\":\"%s\"", - !first ? ',' : ' ', name.c_str(), lbl.c_str()); - for (const auto& attr : node->attrs) - if (attr.first != "label") - disp += StringPrintf(", \"%s\":\"%s\"", - attr.first.c_str(), attr.second.c_str()); - disp += "}\n"; - first = false; - node_id[name] = id++; - } - disp += "]\n,\n"; - - // add edges - disp += "\"links\":[\n"; - first = true; - for (auto src : nodes_) { - for (auto dst : src->dstnodes) { - const string edge_name = GetEdgeName(src->name, dst->name); - string lbl = ""; - if (label.find(edge_name) != label.end()) - lbl = label.at(edge_name); - disp += StringPrintf("%c{\"source\":%d, \"target\":%d, \"label\": \"%s\"", - !first ? ',' : ' ', node_id[src->name], node_id[dst->name], - lbl.c_str()); - if (edge_attrs_.find(edge_name) != edge_attrs_.end()) { - for (const auto& attr : edge_attrs_.at(edge_name)) - disp += StringPrintf(", \"%s\":\"%s\"", - attr.first.c_str(), attr.second.c_str()); - } - disp += "}\n"; - first = false; - } - } - return disp + "]}"; -} -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/image_transform.cc ---------------------------------------------------------------------- diff --git a/src/utils/image_transform.cc b/src/utils/image_transform.cc deleted file mode 100644 index 28d5f4c..0000000 --- a/src/utils/image_transform.cc +++ /dev/null @@ -1,57 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ -#include "singa/utils/image_transform.h" - -namespace singa { - -void ImageTransform(const float* in, const float* mean, bool mirror, int h_crop, - int w_crop, int h_offset, int w_offset, int channel, int height, int width, - float scale, float* out) { - if (h_crop == 0) { - CHECK_EQ(h_offset, 0); - h_crop = height; - } - if (w_crop ==0) { - CHECK_EQ(w_offset, 0); - w_crop = width; - } - CHECK_NE(scale, 0); - - int out_idx = 0, in_idx = 0; - for (int c = 0; c < channel; c++) { - for (int h = 0; h < h_crop; h++) { - for (int w = 0; w < w_crop; w++) { - in_idx = (c * height + h_offset + h) * width + w_offset + w; - if (mirror) { - out_idx = (c * h_crop + h) * w_crop + (w_crop - 1 - w); - } else { - out_idx = (c * h_crop + h) * w_crop + w; - } - out[out_idx] = in[in_idx]; - if (mean != nullptr) - out[out_idx] -= mean[in_idx]; - out[out_idx] *= scale; - } - } - } -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/job_manager.cc ---------------------------------------------------------------------- diff --git a/src/utils/job_manager.cc b/src/utils/job_manager.cc deleted file mode 100644 index 2ea5b1b..0000000 --- a/src/utils/job_manager.cc +++ /dev/null @@ -1,271 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/utils/job_manager.h" - -#include <glog/logging.h> -#include <google/protobuf/text_format.h> -#include <stdlib.h> -#include <algorithm> -#include <fstream> -#include <iostream> -#include "singa/proto/job.pb.h" - -using std::string; -using std::vector; - -namespace singa { - -JobManager::JobManager(const string& host) { - host_ = host; -} - -bool JobManager::Init() { -#ifdef USE_ZOOKEEPER - if (!zk_.Init(host_, timeout_)) return false; - if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathSys.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr)) - return false; -#endif - return true; -} - -bool JobManager::GenerateJobID(int* id) { -#ifdef USE_ZOOKEEPER - char buf[kZKBufSize]; - string lock = kZKPathJLock + "/lock-"; - if (!zk_.CreateNode(lock.c_str(), nullptr, - ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { - return false; - } - *id = atoi(buf + strlen(buf) - 10); -#else - *id = 0; -#endif - return true; -} - -bool JobManager::GenerateHostList(const char* host_file, const char* job_file, - vector<string>* list) { - int nprocs = 1; - list->clear(); - // compute required #process from job conf - if (job_file != nullptr) { - ClusterProto cluster; - google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file), - &cluster); - int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group() - / cluster.nworkers_per_procs(); - int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group() - / cluster.nservers_per_procs(); - if (cluster.server_worker_separate()) - nprocs = nworker_procs + nserver_procs; - else - nprocs = std::max(nworker_procs, nserver_procs); - } -#ifdef USE_ZOOKEEPER - // get available host list from global conf - std::ifstream hostfile(host_file); - if (!hostfile.is_open()) { - LOG(FATAL) << "Cannot open file: " << host_file; - } - vector<string> hosts; - string host; - while (!hostfile.eof()) { - getline(hostfile, host); - if (!host.length() || host[0] == '#') continue; - hosts.push_back(host); - } - if (!hosts.size()) { - LOG(FATAL) << "Empty host file"; - } - // read next host index - char val[kZKBufSize]; - if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false; - int next = atoi(val); - // generate host list - for (int i = 0; i < nprocs; ++i) { - list->push_back(hosts[(next + i) % hosts.size()]); - } - // write next host index - next = (next + nprocs) % hosts.size(); - snprintf(val, kZKBufSize, "%d", next); - if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false; -#else - CHECK_EQ(nprocs, 1) << "To run multi-process job, please enable zookeeper"; - list->push_back("localhost"); -#endif - return true; -} - -bool JobManager::ListJobProcs(int job, vector<string>* procs) { - procs->clear(); -#ifdef USE_ZOOKEEPER - string job_path = GetZKJobWorkspace(job); - // check job path - if (!zk_.Exist(job_path.c_str())) { - LOG(ERROR) << "job " << job << " not exists"; - return true; - } - string proc_path = job_path + kZKPathJobProc; - vector<string> vt; - // check job proc path - if (!zk_.GetChild(proc_path.c_str(), &vt)) { - return false; - } - char buf[singa::kZKBufSize]; - for (string pname : vt) { - pname = proc_path + "/" + pname; - if (!zk_.GetNode(pname.c_str(), buf)) continue; - std::string proc = ""; - for (int i = 0; buf[i] != '\0'; ++i) { - if (buf[i] == ':') { - buf[i] = '\0'; - proc += buf; - } else if (buf[i] == '|') { - proc += buf + i; - } - } - procs->push_back(proc); - } - if (!procs->size()) LOG(ERROR) << "job " << job << " not exists"; -#endif - return true; -} - -bool JobManager::ListJobs(vector<JobInfo>* jobs) { - jobs->clear(); -#ifdef USE_ZOOKEEPER - vector<string> vt; - // get all children in app path - if (!zk_.GetChild(kZKPathApp.c_str(), &vt)) { - return false; - } - std::sort(vt.begin(), vt.end()); - int size = static_cast<int>(vt.size()); - vector<string> procs; - for (int i = 0; i < size; ++i) { - string path = kZKPathApp + "/" + vt[i] + kZKPathJobProc; - if (!zk_.GetChild(path.c_str(), &procs)) continue; - JobInfo job; - string jid = vt[i].substr(vt[i].length()-10); - job.id = atoi(jid.c_str()); - job.procs = procs.size(); - jobs->push_back(job); - // may need to delete it - if (!job.procs && (i + kJobsNotRemoved < size)) - CleanPath(kZKPathApp + "/" + vt[i], true); - } -#else - LOG(ERROR) << "Not supported without zookeeper"; -#endif - return true; -} - -bool JobManager::Remove(int job) { -#ifdef USE_ZOOKEEPER - string path = GetZKJobWorkspace(job) + kZKPathJobProc; - if (zk_.Exist(path.c_str())) { - return CleanPath(path.c_str(), false); - } -#else - LOG(ERROR) << "Not supported without zookeeper"; -#endif - return true; -} - -bool JobManager::RemoveAllJobs() { -#ifdef USE_ZOOKEEPER - if (zk_.Exist(kZKPathApp.c_str())) { - return CleanPath(kZKPathApp.c_str(), false); - } -#else - LOG(ERROR) << "Not supported without zookeeper"; -#endif - return true; -} - -bool JobManager::CleanUp() { -#ifdef USE_ZOOKEEPER - if (zk_.Exist(kZKPathSinga.c_str())) { - return CleanPath(kZKPathSinga.c_str(), true); - } -#else - LOG(ERROR) << "Not supported without zookeeper"; -#endif - return true; -} - -bool JobManager::CleanPath(const string& path, bool remove) { -#ifdef USE_ZOOKEEPER - vector<string> child; - if (!zk_.GetChild(path.c_str(), &child)) return false; - for (string c : child) { - if (!CleanPath(path + "/" + c, true)) return false; - } - if (remove) return zk_.DeleteNode(path.c_str()); -#else - LOG(ERROR) << "Not supported without zookeeper"; -#endif - return true; -} - -// extract cluster configuration part from the job config file -// TODO(wangsh) improve this function to make it robust -string JobManager::ExtractClusterConf(const char* job_file) { - std::ifstream fin(job_file); - CHECK(fin.is_open()) << "cannot open job conf file " << job_file; - string line; - string cluster; - bool in_cluster = false; - while (!fin.eof()) { - std::getline(fin, line); - if (in_cluster == false) { - size_t pos = line.find("cluster"); - if (pos == std::string::npos) continue; - in_cluster = true; - line = line.substr(pos); - cluster = ""; - } - if (in_cluster == true) { - cluster += line + "\n"; - if (line.find("}") != std::string::npos) - in_cluster = false; - } - } - LOG(INFO) << "cluster configure: " << cluster; - size_t s_pos = cluster.find("{"); - size_t e_pos = cluster.find("}"); - if (s_pos == std::string::npos || e_pos == std::string::npos) { - LOG(FATAL) << "cannot extract valid cluster configuration in file: " - << job_file; - } - return cluster.substr(s_pos + 1, e_pos - s_pos-1); -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/math_kernel.cu ---------------------------------------------------------------------- diff --git a/src/utils/math_kernel.cu b/src/utils/math_kernel.cu deleted file mode 100644 index 65d7067..0000000 --- a/src/utils/math_kernel.cu +++ /dev/null @@ -1,450 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ -#include <cmath> -#include <algorithm> -#include "singa/utils/math_kernel.h" -#include "mshadow/tensor.h" // FLT_MIN? - -#define CU2DBLOCK_X 32 -#define CU2DBLOCK_Y 32 - -#define CU1DBLOCK 1024 -#define CU1DBLOCKF 1024.0 - -// Cuda Kernel Functions - -__global__ -void kernel_softmax_loss(const float *prob, const int *label , float *loss, - int n, int dim) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - float prob_of_truth = prob[index * dim + label[index]]; - loss[index] -= log(max(prob_of_truth, FLT_MIN)); - } -} - -__global__ -void kernel_softmax_gradient(float *grad, const int *label , - int n, int dim, float scale) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - int pos = index * dim + label[index]; - grad[pos] = (grad[pos] - 1.0f) * scale; - } -} - -__global__ -void kernel_sum_vec(float *data, float *sum , int n) { - int THREADS = blockDim.x; - - __shared__ float aux[CU1DBLOCK]; - int steps = (n - 1) / THREADS + 1; - aux[threadIdx.x] = data[threadIdx.x]; - - for (int i = 1; i < steps; ++i) { - if (threadIdx.x + i * THREADS < n) { - aux[threadIdx.x] += data[threadIdx.x+i*THREADS]; - } - } - - int total_threads = THREADS; - __syncthreads(); - - while (total_threads > 1) { - int half_point = ((1+total_threads) >> 1); - if (threadIdx.x < half_point) { - if (threadIdx.x+half_point < total_threads) { - aux[threadIdx.x] += aux[threadIdx.x + half_point]; - } - } - __syncthreads(); - total_threads = ((total_threads+1) >> 1); - } - - __syncthreads(); - *sum = aux[0]; -} - -__global__ -void kernel_sum_col(const float *src_mat_data, - float *dst_vec_data, int rows, int cols, int stride) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < rows; index += num_threads) { - dst_vec_data[index] = 0.0f; - for (int k = 0; k < cols; k++) { - dst_vec_data[index] += src_mat_data[index * stride + k]; - } - } -} - -__global__ -void kernel_sum_row(const float *src_mat_data, - float *dst_vec_data, int rows, int cols, int stride) { - int j = blockIdx.x; - int THREADS = blockDim.x; - if (j >= cols) { - return; - } - - __shared__ float aux[CU1DBLOCK]; - int steps = (rows - 1) / THREADS + 1; - aux[threadIdx.x] = src_mat_data[j+threadIdx.x*stride]; - for (int i = 1; i < steps; ++i) { - if (threadIdx.x+i*THREADS < rows) { - aux[threadIdx.x] += src_mat_data[j+(threadIdx.x+i*THREADS)*stride]; - } - } - - int total_threads = THREADS; - __syncthreads(); - while (total_threads > 1) { - int half_point = ((1+total_threads) >> 1); - if (threadIdx.x < half_point) { - if (threadIdx.x+half_point < total_threads) { - aux[threadIdx.x] += aux[threadIdx.x + half_point]; - } - } - __syncthreads(); - total_threads = ((total_threads+1) >> 1); - } - - __syncthreads(); - dst_vec_data[j] = aux[0]; -} - -__global__ -void kernel_add_vec_row(const float *src_vec_data, const float *src_mat_data, - float* des_mat_data, int rows, int cols, int stride) { - int i = blockIdx.x * blockDim.x + threadIdx.x; - int j = blockIdx.y * blockDim.y + threadIdx.y; - int num_threads_x = blockDim.x * gridDim.x; - int num_threads_y = blockDim.y * gridDim.y; - int index = 0; - for (; i < cols && j < rows; i += num_threads_x, j += num_threads_y) { - index = j * stride + i; - des_mat_data[index] = src_mat_data[index] + src_vec_data[i]; - } -} - -__global__ -void kernel_exp(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = exp(src_data[index]); - } -} - -__global__ -void kernel_log(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = log(src_data[index]); - } -} - -__global__ -void kernel_sigmoid(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = 1.0f / (1.0f + expf(-src_data[index])); - } -} - -__global__ -void kernel_sigmoid_grad(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = src_data[index] * (1.0f - src_data[index]); - } -} - -__global__ -void kernel_relu(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = max(src_data[index], 0.0f); - } -} - -__global__ -void kernel_relu_grad(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = src_data[index] > 0.0f ? 1.0f : 0.0f; - } -} - -__global__ -void kernel_tanh(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = tanhf(src_data[index]); - } -} - -__global__ -void kernel_tanh_grad(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = (1.0f - src_data[index] * src_data[index]); - } -} - -__global__ -void kernel_softplus(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = logf(1 + expf(src_data[index])); - } -} - -__global__ -void kernel_softplus_grad(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = 1.0f / (1.0f + expf(-src_data[index])); - } -} - -__global__ -void kernel_square(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = src_data[index] * src_data[index]; - } -} - -__global__ -void kernel_square_grad(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = 2 * sqrt(src_data[index]); - } -} - -__global__ -void kernel_sqrt(const float *src_data, float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = sqrt(src_data[index]); - } -} - -__global__ -void kernel_pow(const float *src_data_a, const float *src_data_b, - float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = pow(src_data_a[index], src_data_b[index]); - } -} - -__global__ -void kernel_mult(const float *src_data_a, const float *src_data_b, - float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = src_data_a[index] * src_data_b[index]; - } -} - -__global__ -void kernel_div(const float *src_data_a, const float *src_data_b, - float *des_data, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = src_data_a[index] / src_data_b[index]; - } -} - -__global__ static -void kernel_set_value(float *data, float value, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - data[index] = value; - } -} - -__global__ -void kernel_threshold(const float *src_data, float *des_data, - float alpha, int n) { - int index = blockIdx.x * blockDim.x + threadIdx.x; - int num_threads = blockDim.x * gridDim.x; - for (; index < n; index += num_threads) { - des_data[index] = src_data[index] < alpha ? 1.0f : 0.0f; - } -} - -// -namespace singa { - -void singa_gpu_softmaxloss_forward(int n, int dim, const float *prob, - const int *label, float *loss) { - kernel_softmax_loss<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(prob, label, loss, n, - dim); -} - -void singa_gpu_softmaxloss_backward(int n, int dim, float scale, - const int *label, float *grad) { - kernel_softmax_gradient<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(grad, label, n, - dim, scale); -} - -void singa_gpu_sum_vec(float *data, float *sum , int n) { - int threads_per_block = n > CU1DBLOCK ? CU1DBLOCK : n; - // here, we only need one block - int num_blocks = 1; - - kernel_sum_vec<<<num_blocks, threads_per_block>>>(data, sum, n); -} - -void singa_gpu_sum_row(const float *src_mat_data, float *dst_vec_data, - int rows, int cols, int stride) { - int threads_per_block = rows > CU1DBLOCK ? CU1DBLOCK : rows; - int num_blocks = cols; - - kernel_sum_row<<<num_blocks, threads_per_block>>>(src_mat_data, - dst_vec_data, rows, cols, stride); -} - -void singa_gpu_sum_col(const float *src_mat_data, float *dst_vec_data, - int rows, int cols, int stride) { - int threads_per_block = cols > CU1DBLOCK ? CU1DBLOCK : cols; - int num_blocks = rows; - - kernel_sum_col<<<num_blocks, threads_per_block>>>(src_mat_data, - dst_vec_data, rows, cols, stride); -} - -void singa_gpu_add_vec_row(const float *src_vec_data, const float *src_mat_data, - float *des_mat_data , int rows, int cols, int stride) { - dim3 threads_per_block(CU2DBLOCK_X, CU2DBLOCK_Y); - dim3 num_blocks(cols/threads_per_block.x + - (cols%threads_per_block.x == 0 ? 0 : 1), - rows/threads_per_block.y + (rows%threads_per_block.y == 0 ? 0 : 1)); - kernel_add_vec_row<<<num_blocks, threads_per_block>>> - (src_vec_data, src_mat_data, des_mat_data, rows, cols, stride); -} - -void singa_gpu_exp(const float *src_data, float *des_data, int n) { - kernel_exp<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_log(const float *src_data, float *des_data, int n) { - kernel_log<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_sigmoid(const float *src_data, float *des_data, int n) { - kernel_sigmoid<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_sigmoid_grad(const float *src_data, float *des_data, - int n) { - kernel_sigmoid_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>> - (src_data, des_data, n); -} - -void singa_gpu_relu(const float *src_data, float *des_data, int n) { - kernel_relu<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_relu_grad(const float *src_data, float *des_data, int n) { - kernel_relu_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_tanh(const float *src_data, float *des_data, int n) { - kernel_tanh<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_tanh_grad(const float *src_data, float *des_data, int n) { - kernel_tanh_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_softplus(const float *src_data, float *des_data, int n) { - kernel_softplus<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_softplus_grad(const float *src_data, float *des_data, int n) { - kernel_softplus_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>> - (src_data, des_data, n); -} - -void singa_gpu_square(const float *src_data, float *des_data, int n) { - kernel_square<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_square_grad(const float *src_data, float *des_data, int n) { - kernel_square_grad<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_sqrt(const float *src_data, float *des_data, int n) { - kernel_sqrt<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(src_data, des_data, n); -} - -void singa_gpu_pow(const float *src_data_a, const float *src_data_b, - float *des_data, int n) { - kernel_pow<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>> - (src_data_a, src_data_b, des_data, n); -} - -void singa_gpu_mult(const float *src_data_a, const float *src_data_b, - float *des_data, int n) { - kernel_mult<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>> - (src_data_a, src_data_b, des_data, n); -} - -void singa_gpu_div(const float *src_data_a, const float *src_data_b, - float *des_data, int n) { - kernel_div<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>> - (src_data_a, src_data_b, des_data, n); -} - -void singa_gpu_set_value(float *data, float value, int n) { - kernel_set_value<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>>(data, value, n); -} - -void singa_gpu_threshold(const float *src_data, float *des_data, - float alpha, int n) { - kernel_threshold<<<ceil(n/CU1DBLOCKF), CU1DBLOCKF>>> - (src_data, des_data, alpha, n); -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/param.cc ---------------------------------------------------------------------- diff --git a/src/utils/param.cc b/src/utils/param.cc deleted file mode 100644 index 73d8314..0000000 --- a/src/utils/param.cc +++ /dev/null @@ -1,447 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/utils/param.h" - -#include <glog/logging.h> -#include <cmath> -#include <random> -#include <unordered_map> -#include "mshadow/tensor.h" -#include "singa/utils/factory.h" -#include "singa/utils/singleton.h" -#include "singa/utils/common.h" - -namespace singa { - -using mshadow::cpu; -using mshadow::Random; -using mshadow::Shape1; -using mshadow::Tensor; -using std::vector; -using std::string; - -ParamGenerator* ParamGenerator::Create(const ParamGenProto& proto) { - auto factory = Singleton<Factory<ParamGenerator>>::Instance(); - ParamGenerator * gen = nullptr; - if (proto.has_user_type()) - gen = factory->Create(proto.user_type()); - else - gen = factory->Create(proto.type()); - gen->Init(proto); - return gen; -} - -void ParamGenerator::Fill(Blob<float>* blob) { - Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count())); - data = proto_.value(); -} - -void GaussianGen::Fill(Blob<float>* blob) { - Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count())); - auto random = TSingleton<Random<cpu>>::Instance(); - random->SampleGaussian(data, proto_.mean(), proto_.std()); - if (proto_.value() != 1) - data *= proto_.value(); -} - -void GaussianSqrtFanInGen::Fill(Blob<float>* blob) { - // only valid for param matrix with num of cols as fan in - CHECK_EQ(blob->shape().size(), 2); - Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count())); - GaussianGen::Fill(blob); - data /= sqrt(blob->shape().at(1)); -} - -void UniformGen::Fill(Blob<float>* blob) { - Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count())); - auto random = TSingleton<Random<cpu>>::Instance(); - random->SampleUniform(data, proto_.low(), proto_.high()); - if (proto_.value() != 1) - data *= proto_.value(); -} - -void UniformSqrtFanInGen::Fill(Blob<float>* blob) { - // only valid for param matrix with num of cols as fan in - CHECK_EQ(blob->shape().size(), 2); - Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count())); - UniformGen::Fill(blob); - data /= sqrt(blob->shape().at(1) / 3.0f); -} - -void UniformSqrtFanInOutGen::Fill(Blob<float>* blob) { - // only valid for param matrix with num of cols as fan in - CHECK_EQ(blob->shape().size(), 2); - Tensor<cpu, 1> data(blob->mutable_cpu_data(), Shape1(blob->count())); - UniformGen::Fill(blob); - data /= sqrt(blob->shape()[0] + blob->shape()[1]); -} - -/****************** Param functions *********************************/ -Param* Param::Create(const ParamProto& proto) { - Factory<Param>* factory = Singleton<Factory<Param>>::Instance(); - Param* p = nullptr; - if (proto.has_user_type()) - p = factory->Create(proto.user_type()); - else - p = factory->Create(proto.type()); - p->Init(proto); - return p; -} - -const vector<int> Param::ComputeSlices(int num, const vector<Param*>& params) { - // collect sizes of unique Params - std::vector<int> paramsize; - for (auto param : params) - if (param->id() == param->owner()) - paramsize.push_back(param->size()); - // slice into lcm pieces to achieve good load-balance for both intra-group - // partition (among servers in a group) and inter-group partition (each group - // is assgined a sub-set of slices) - auto param_slice = Slice(num, paramsize); - vector<int> slices; - for (auto const vec : param_slice) - for (int len : vec) - slices.push_back(len); - return slices; -} - -void Param::SliceParams(int num, const vector<Param*>& params) { - auto slices = ComputeSlices(num, params); - // construct map from Param ID to its slices <slice id, len> - std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices; - int slice_id = 0; - auto it = slices.begin(); - for (auto param : params) { - if (param->id() == param->owner()) { - int len = 0; - while (len < param->size() && it != slices.end()) { - paramid2slices[param->id()].push_back(std::make_pair(slice_id++, *it)); - len += *it; - it++; - } - CHECK_EQ(param->size(), len) << "length misamtch for ID=" << param->id(); - } - } - for (auto param : params) { - for (auto entry : paramid2slices[param->owner()]) { - param->AddSlice(entry.first, entry.second); - LOG(INFO) << "param id " << param->id() << " owner=" << param->owner() - << ", slice id = " << entry.first << ", size = " << entry.second; - } - } -} - -void Param::Setup(const vector<int>& shape) { - data_.Reshape(shape); - grad_.Reshape(shape); - history_.Reshape(shape); - update_.Reshape(shape); -} - -void Param::InitValues() { - InitValues(0); -} - -void Param::InitValues(int version) { - ParamGenerator* gen = ParamGenerator::Create(proto_.init()); - gen->Fill(&data_); - set_version(version); -} - -void Param::ShareDataFrom(Param* other, bool cpu_only) { - if (this == other) { - LOG(WARNING) << "No need to share Param with itself"; - return; - } - - proto_.set_owner(other->owner()); - CHECK_EQ(data_.count(), other->data_.count()); - data_.ShareData(&(other->data_), cpu_only); - if (grad_.count() == 0) - grad_.Reshape(data_.shape()); - version_ = other->version_; - last_version_ = other->last_version_; - slice_start_ = other->slice_start_; - num_slices_ = other->num_slices_; - slice_offset_ = other->slice_offset_; - slice_size_ = other->slice_size_; - // change pending list size equal to slice size - pending_get_.resize(other->pending_get_.size()); - pending_update_.resize(other->pending_update_.size()); -} - -void Param::ShareFrom(Param* other) { - if (this == other) { - LOG(WARNING) << "No need to share Param with itself"; - return; - } - - ShareDataFrom(other, false); - grad_.ShareData(&(other->grad_), false); -} - -void Param::FromProto(const string str) { - BlobProto blob; - blob.ParseFromString(str); - data_.FromProto(blob); -} - -void Param::FromProto(const BlobProto& blob) { - data_.FromProto(blob); -} - -void Param::ToProto(BlobProto* blob) { - data_.ToProto(blob); -} - -void Param::AddSlice(int slice_id, int size) { - int offset = 0; - if (slice_size_.size() > 0) { - // must be added in order - CHECK_EQ(slice_start_ + num_slices_, slice_id); - offset = slice_offset_.back() + slice_size_.back(); - } else { - slice_start_ = slice_id; - offset = 0; - } - slice_offset_.push_back(offset); - slice_size_.push_back(size); - pending_get_.push_back(false); - pending_update_.push_back(false); - num_slices_++; -} - -Msg* Param::GenPutMsg(bool copy, int idx) { - CHECK_LT(idx, num_slices_); - Msg* msg = new Msg(); - msg->set_type(kPut); - const void* ptr = data_.cpu_data() + slice_offset_[idx]; - const void* p = ptr; - if (copy) p = nullptr; - msg->AddFormatFrame("iffp", slice_size_[idx], lr_scale(), wd_scale(), p); - if (copy) { - msg->AddFrame(ptr, slice_size_[idx] * sizeof(float)); - } -// LOG(ERROR) << "gen put msg: " << msg; - return msg; -} - -Msg* Param::GenGetMsg(bool copy, int idx) { - CHECK_LT(idx, num_slices_); - Msg* msg = new Msg(); - msg->set_type(kGet); - msg->AddFormatFrame("ip", copy, data_.mutable_cpu_data() - + slice_offset_[idx]); - pending_get_[idx] = true; - num_pending_requests_++; - return msg; -} - -Msg* Param::GenUpdateMsg(bool copy, int idx) { - CHECK_LT(idx, num_slices_); - Msg* msg = new Msg(); - msg->set_type(kUpdate); - msg->AddFormatFrame("i", copy); - const void* ptr = grad_.cpu_data() + slice_offset_[idx]; - if (copy) { - msg->AddFrame(ptr, slice_size_[idx]*sizeof(float)); - } else { - msg->AddFormatFrame("p", ptr); // to share values of grad blob - } - - pending_update_[idx] = true; - num_pending_requests_++; - return msg; -} - -Msg* Param::GenSyncMsg(int offset, int size) { - Msg* msg = new Msg(); - msg->set_type(kSyncRequest); - msg->set_trgt(ParamTrgt(-1, id()), last_version()); - // always copy data because syn is between server groups in diff procs - msg->AddFrame(mutable_cpu_data(), data_.count()*sizeof(float)); - return msg; -} - -Msg* Param::HandlePutMsg(Msg** msg, bool reserve) { - // TODO(wangsheng) remove the check later - CHECK(reserve); - int size; - float lr, wc; - float* ptr; -// LOG(ERROR) << "handle put msg:" << *msg; - (*msg)->ParseFormatFrame("iffp", &size, &lr, &wc, &ptr); - ParamProto proto; - proto.set_lr_scale(lr); - proto.set_wd_scale(wc); - vector<int> shape{size}; - Init(proto); - Setup(shape); - if (ptr == nullptr) { - CHECK((*msg)->NextFrame()); - CHECK_EQ(size * sizeof(float), (*msg)->FrameSize()); - memcpy(mutable_cpu_data(), (*msg)->FrameData(), size * sizeof(float)); - } else { - data_.set_cpu_data(ptr); - } - if (!reserve) DeleteMsg(msg); - return nullptr; -} - -Msg* Param::HandleGetMsg(Msg** msg, bool reserve) { - // TODO(wangsheng) remove the check later - CHECK(!reserve); - int copy; - float* ptr; - (*msg)->ParseFormatFrame("ip", ©, &ptr); - if (copy) { - (*msg)->AddFrame(mutable_cpu_data(), sizeof(float) * size()); - } else if (ptr != data_.cpu_data()) { - // this case reflects following situation: - // worker 0 and server are in the same process, while worker 1 is not. - // worker 1 "put" data into server, so server need to allocate memory. - // then worker 0 "get" data from server, so server need: - // 1. copy the data to the worker0 provided space - // 2. change its own pointer to that space in order to share memory - // in this case, the server always points to last worker's space - memcpy(ptr, data_.cpu_data(), sizeof(float) * size()); - data_.set_cpu_data(ptr); - } - // else the mem space is shared among all worker and servers - Msg* ret = nullptr; - if (reserve) { - ret = new Msg(**msg); - } else { - // if not reserve the msg, we reuse it as return value - ret = *msg; - *msg = nullptr; - } - ret->SwapAddr(); - ret->set_type(kRGet); - return ret; -} - -void Param::ParseUpdateMsgs(const vector<Msg*>& msgs) { - CHECK_GT(msgs.size(), 0); - float* server_grad = nullptr; - vector<float*> worker_grad; - for (auto* msg : msgs) { - int copy; - msg->ParseFormatFrame("i", ©); - msg->NextFrame(); - float* ptr = nullptr; - if (copy) { - ptr = static_cast<float*>(msg->FrameData()); - CHECK_EQ(size() * sizeof(float), msg->FrameSize()); - } else { - msg->ParseFormatFrame("p", &ptr); - server_grad = ptr; - } - worker_grad.push_back(ptr); - } - if (server_grad == nullptr) - server_grad = worker_grad.at(0); - for (float* grad : worker_grad) { - if (grad != server_grad) { - // TODO(wangsh) think about optimize it later? - for (int i = 0; i < size(); i++) { - server_grad[i] += grad[i]; - } - } - } - grad_.set_cpu_data(server_grad); -} - -const vector<Msg*> Param::GenUpdateResponseMsgs(vector<Msg*>* msgs, - bool reserve) { - // TODO(wangsheng) remove the check later - CHECK(!reserve); - vector<Msg*> ret; - for (Msg* msg : *msgs) { - Msg* ptr = reserve ? new Msg(*msg) : msg; - ptr->FirstFrame(); - ptr->SwapAddr(); - ptr->set_type(kRUpdate); - int copy; - ptr->ParseFormatFrame("i", ©); - if (copy) { - ptr->NextFrame(); - CHECK_EQ(ptr->FrameSize(), sizeof(float) * size()); - memcpy(ptr->FrameData(), mutable_cpu_data(), ptr->FrameSize()); - } - ret.push_back(ptr); - } - // if not reserved, we remove all pointers - if (!reserve) msgs->clear(); - return ret; -} - -Msg* Param::HandleSyncMsg(Msg** msg, bool reserve) { - // TODO(wangwei) handle it later - if (!reserve) DeleteMsg(msg); - return nullptr; -} - -int Param::ParseGetResponseMsg(Msg *msg, int slice_idx) { - CHECK(pending_get_[slice_idx]) << slice_idx; - pending_get_[slice_idx] = false; - ParseResponseMsg(msg, slice_idx); - return (--num_pending_requests_) % num_slices_ == 0; -} - -int Param::ParseUpdateResponseMsg(Msg *msg, int slice_idx) { - CHECK(pending_update_[slice_idx]) << id() << " " << slice_idx; - pending_update_[slice_idx] = false; - ParseResponseMsg(msg, slice_idx); - return (--num_pending_requests_) % num_slices_ == 0; -} - -int Param::ParseSyncResponseMsg(Msg* msg, int slice_idx) { - // TODO(wangwei) handle it later - return 1; -} - -void Param::ParseResponseMsg(Msg* msg, int slice_idx) { - int copy; - msg->ParseFormatFrame("i", ©); - msg->NextFrame(); - if (copy) { - CHECK_EQ(msg->FrameSize(), slice_size_[slice_idx] * sizeof(float)); - memcpy(mutable_cpu_data() + slice_offset_[slice_idx], - msg->FrameData(), msg->FrameSize()); - } - // LOG(ERROR)<<"parse response norm "<<data_->asum_data()<<" of "<<id(); -} - -/************************ParamEntry***************************/ -ParamEntry::ParamEntry(int total, Param* p) { - num_total = total; - shares.push_back(p); -} - -void ParamEntry::AddParam(bool local, Param* p) { - num_local += local; - num_total += 1; - if (local) shares.push_back(p); -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/tool.cc ---------------------------------------------------------------------- diff --git a/src/utils/tool.cc b/src/utils/tool.cc deleted file mode 100644 index 3b1df72..0000000 --- a/src/utils/tool.cc +++ /dev/null @@ -1,169 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include <glog/logging.h> -#include <algorithm> -#include <string> -#include <vector> -#include "singa/proto/singa.pb.h" -#include "singa/utils/common.h" -#include "singa/utils/job_manager.h" - -std::string conf_dir; -singa::SingaProto global; -const int SUCCESS = 0; -const int ARG_ERR = 1; -const int RUN_ERR = 2; - -// show log dir in global config -int getlogdir() { - std::string dir = global.log_dir(); - while (dir.length() > 1 && dir[dir.length()-1] == '/') dir.pop_back(); - printf("%s\n", dir.c_str()); - return SUCCESS; -} - -// generate a unique job id -int create() { - singa::JobManager mngr(global.zookeeper_host()); - if (!mngr.Init()) return RUN_ERR; - int id; - if (!mngr.GenerateJobID(&id)) return RUN_ERR; - printf("%d\n", id); - return SUCCESS; -} - -// generate a host list -int genhost(char* job_conf) { - singa::JobManager mngr(global.zookeeper_host()); - if (!mngr.Init()) return RUN_ERR; - std::vector<std::string> list; - if (!mngr.GenerateHostList((conf_dir+"/hostfile").c_str(), job_conf, &list)) - return RUN_ERR; - // output selected hosts - for (std::string host : list) - printf("%s\n", host.c_str()); - return SUCCESS; -} - -// list singa jobs (running or all) -int list(bool all) { - singa::JobManager mngr(global.zookeeper_host()); - if (!mngr.Init()) return RUN_ERR; - std::vector<singa::JobInfo> jobs; - if (!mngr.ListJobs(&jobs)) return RUN_ERR; - printf("JOB ID |NUM PROCS \n"); - printf("----------|-----------\n"); - for (singa::JobInfo job : jobs) { - if (!job.procs && !all) continue; - printf("%-10d|%-10d\n", job.id, job.procs); - } - return SUCCESS; -} - -// view procs of a singa job -int view(int id) { - singa::JobManager mngr(global.zookeeper_host()); - if (!mngr.Init()) return RUN_ERR; - std::vector<std::string> procs; - if (!mngr.ListJobProcs(id, &procs)) return RUN_ERR; - for (std::string s : procs) { - printf("%s\n", s.c_str()); - } - return SUCCESS; -} - -// remove a job path in zookeeper -int remove(int id) { - singa::JobManager mngr(global.zookeeper_host()); - if (!mngr.Init()) return RUN_ERR; - if (!mngr.Remove(id)) return RUN_ERR; - return SUCCESS; -} - -// remove all job paths in zookeeper -int removeall() { - singa::JobManager mngr(global.zookeeper_host()); - if (!mngr.Init()) return RUN_ERR; - if (!mngr.RemoveAllJobs()) return RUN_ERR; - return SUCCESS; -} - -// clean all singa data in zookeeper -int cleanup() { - singa::JobManager mngr(global.zookeeper_host()); - if (!mngr.Init()) return RUN_ERR; - if (!mngr.CleanUp()) return RUN_ERR; - return SUCCESS; -} - -int main(int argc, char **argv) { - std::string usage = "Usage: singatool <command> <args>\n" - " getlogdir : show log dir in global config\n" - " create : generate a unique job id\n" - " genhost <job conf> : generate a host list\n" - " list : list running singa jobs\n" - " listall : list all singa jobs\n" - " view <job id> : view procs of a singa job\n" - " remove <job id> : remove a job path in zookeeper\n" - " removeall : remova all job paths in zookeeper\n" - " cleanup : clean all singa data in zookeeper\n" - "[optional arguments] NOTICE: must put at end of a command\n" - " -confdir <dir> : path to singa global conf dir"; - - // set logging level to ERROR and log to STDERR only - google::LogToStderr(); - google::SetStderrLogging(google::ERROR); - google::InitGoogleLogging(argv[0]); - // parse -confdir argument - int arg_pos = singa::ArgPos(argc, argv, "-confdir"); - conf_dir = arg_pos == -1 ? "conf" : argv[arg_pos+1]; - if (arg_pos != -1) argc -= 2; - singa::ReadProtoFromTextFile((conf_dir+"/singa.conf").c_str(), &global); - - // stat code: ARG_ERR for wrong argument, RUN_ERR for runtime error - int stat = (argc <= 1) ? ARG_ERR : SUCCESS; - if (stat == SUCCESS) { - if (!strcmp(argv[1], "getlogdir")) - stat = getlogdir(); - else if (!strcmp(argv[1], "create")) - stat = create(); - else if (!strcmp(argv[1], "genhost")) - stat = (argc > 2) ? genhost(argv[2]) : genhost(nullptr); - else if (!strcmp(argv[1], "list")) - stat = list(false); - else if (!strcmp(argv[1], "listall")) - stat = list(true); - else if (!strcmp(argv[1], "view")) - stat = (argc > 2) ? view(atoi(argv[2])) : ARG_ERR; - else if (!strcmp(argv[1], "remove")) - stat = (argc > 2) ? remove(atoi(argv[2])) : ARG_ERR; - else if (!strcmp(argv[1], "removeall")) - stat = removeall(); - else if (!strcmp(argv[1], "cleanup")) - stat = cleanup(); - else - stat = ARG_ERR; - } - - if (stat == ARG_ERR) LOG(ERROR) << usage; - return stat; -}
