http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/updater.cc ---------------------------------------------------------------------- diff --git a/src/utils/updater.cc b/src/utils/updater.cc deleted file mode 100644 index a2180d3..0000000 --- a/src/utils/updater.cc +++ /dev/null @@ -1,284 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/utils/updater.h" - -#include "mshadow/cxxnet_op.h" -#include "mshadow/tensor.h" -#include "singa/utils/singleton.h" -#include "singa/utils/factory.h" - -namespace singa { - -using mshadow::cpu; -using mshadow::expr::F; -using mshadow::op::sqrtop; -using mshadow::op::square; -using mshadow::Shape; -using mshadow::Shape1; -using mshadow::Tensor; -using mshadow::TensorContainer; - -LRGenerator* LRGenerator::Create(const LRGenProto& proto) { - auto factory = Singleton<Factory<LRGenerator>>::Instance(); - LRGenerator* gen = nullptr; - if (proto.has_user_type()) - gen = factory->Create(proto.user_type()); - else - gen = factory->Create(proto.type()); - gen->Init(proto); - return gen; -} - -float FixedStepLRGen::Get(int step) { - if (last_idx_ < proto_.fixedstep_conf().step_size() - 1 - && step >= proto_.fixedstep_conf().step(last_idx_ + 1)) { - last_idx_++; - } - return proto_.fixedstep_conf().step_lr(last_idx_); -} - -float StepLRGen::Get(int step) { - // do not cast int to float - int freq = proto_.step_conf().change_freq(); - float lr = proto_.base_lr() * pow(proto_.step_conf().gamma(), step / freq); - // LOG_IF(INFO, step % freq == 0) << "Update learning rate to " << lr - // << " @ step " << step; - return lr; -} - -float LinearLRGen::Get(int step) { - int freq = proto_.linear_conf().change_freq(); - float r = step * 1.0 / freq; - return (1.0 - r) * proto_.base_lr() + r * proto_.linear_conf().final_lr(); -} - -float ExpLRGen::Get(int step) { - int freq = proto_.exponential_conf().change_freq(); - return proto_.base_lr() / pow(2, step * 1. / freq); -} - -float InvLRGen::Get(int step) { - return proto_.base_lr() * pow(1.f + proto_.inverse_conf().gamma() * step, - - proto_.inverse_conf().pow()); -} - -float InvTLRGen::Get(int step) { - return proto_.base_lr() / (1 + step * 1. / proto_.inverset_conf().final_lr()); -} - -Updater* Updater::Create(const UpdaterProto& proto) { - auto factory = Singleton<Factory<Updater>>::Instance(); - Updater* updater = nullptr; - if (proto.has_user_type()) - updater = factory->Create(proto.user_type()); - else - updater = factory->Create(proto.type()); - updater->Init(proto); - return updater; -} - -/**************** added for Python Binding ***************************/ -Updater* Updater::CreateUpdater(const string str) { - UpdaterProto conf; - conf.ParseFromString(str); - return Updater::Create(conf); -} -/***********************Python Binding end**************************/ - - -/***********************SGD with momentum******************************/ -void Updater::Init(const UpdaterProto& proto) { - momentum_ = proto.momentum(); - weight_decay_ = proto.weight_decay(); - lr_gen_ = LRGenerator::Create(proto.learning_rate()); - clip_low_ = proto.clip_low(); - clip_high_ = proto.clip_high(); -} - -void Updater::Clip(const float low, const float high, Param* param) { - Blob<float>* grad = param->mutable_grad(); - float* ptr = grad->mutable_cpu_data(); - for (int i = 0; i < grad->count(); i++) { - if (ptr[i] > high) - ptr[i] = high; - else if (ptr[i] < low) - ptr[i] = low; - } -} - -void SGDUpdater::Update(int step, Param* param, float grad_scale) { - if (clip_high_ > clip_low_) - Clip(clip_low_, clip_high_, param); - Shape<1> s = Shape1(param->size()); - Tensor<cpu, 1> data(param->mutable_cpu_data(), s); - Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s); - float lr = lr_gen_->Get(step) * param->lr_scale(); - float wd = weight_decay_ * param->wd_scale(); - grad *= grad_scale; - if (wd > 0) // L2 regularization, should be done after timing grad_scale - grad += data * wd; - if (momentum_ > 0) { - Tensor<cpu, 1> history(param->mutable_cpu_history(), s); - history = history * momentum_ - lr * grad; - data += history; - } else { - grad *= -lr; - data += grad; - } -} - -/***********************Nesterov******************************/ -void NesterovUpdater::Update(int step, Param* param, float grad_scale) { - if (clip_high_ > clip_low_) - Clip(clip_low_, clip_high_, param); - - Shape<1> s = Shape1(param->size()); - Tensor<cpu, 1> data(param->mutable_cpu_data(), s); - Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s); - Tensor<cpu, 1> history(param->mutable_cpu_history(), s); - TensorContainer<cpu, 1> tmp(s); - float lr = lr_gen_->Get(step)*param->lr_scale(); - float wd = weight_decay_*param->wd_scale(); - grad *= grad_scale; - if (wd > 0) // L2 regularization, should be done after timing grad_scale - grad += data * wd; - Copy(tmp, history); - history = history * momentum_ + lr * grad; - tmp = history * (1 + momentum_) - tmp * momentum_; - data -= tmp; -} -/***********************AdaGrad******************************/ -void AdaGradUpdater::Update(int step, Param* param, float grad_scale) { - if (clip_high_ > clip_low_) - Clip(clip_low_, clip_high_, param); - Shape<1> s = Shape1(param->size()); - Tensor<cpu, 1> data(param->mutable_cpu_data(), s); - Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s); - Tensor<cpu, 1> history(param->mutable_cpu_history(), s); - float lr = lr_gen_->Get(step)*param->lr_scale(); - float wd = weight_decay_*param->wd_scale(); - grad *= grad_scale; - if (wd > 0) // L2 regularization, should be done after timing grad_scale - grad += data * wd; - history += F<square>(grad); - data -= lr * grad / (F<sqrtop>(history, proto_.delta())); -} - -/***********************RMSProp******************************/ -void RMSPropUpdater::Init(const UpdaterProto& proto) { - Updater::Init(proto); - rho_ = proto.rmsprop_conf().rho(); - delta_ = proto.delta(); -} - -void RMSPropUpdater::Update(int step, Param* param, float grad_scale) { - if (clip_high_ > clip_low_) - Clip(clip_low_, clip_high_, param); - - Shape<1> s=Shape1(param->size()); - Tensor<cpu, 1> data(param->mutable_cpu_data(), s); - Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s); - Tensor<cpu, 1> history(param->mutable_cpu_history(), s); - float lr = lr_gen_->Get(step) * param->lr_scale(); - float wd = weight_decay_ * param->wd_scale(); - grad *= grad_scale; - if (wd > 0) // L2 regularization, should be done after timing grad_scale - grad += data * wd; - history = history * rho_ + (1 - rho_) * F<square>(grad); - data -= lr * grad / F<sqrtop>(history, delta_); -} -/***********************AdaDelta******************************/ -void AdaDeltaUpdater::Init(const UpdaterProto& proto){ - Updater::Init(proto); - delta_ = proto.delta(); - rho_=proto.adadelta_conf().rho(); -} - -void AdaDeltaUpdater::Update(int step, Param* param, float grad_scale){ - Shape<1> s=Shape1(param->size()); - Tensor<cpu, 1> data(param->mutable_cpu_data(), s); - Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s); - Tensor<cpu, 1> history(param->mutable_cpu_history(), s); - Tensor<cpu, 1> update(param->mutable_cpu_update(), s); - TensorContainer<cpu, 1> tmp(s); - float wd = weight_decay_*param->wd_scale(); - float lr = lr_gen_->Get(step) * param->lr_scale(); - grad *= grad_scale; - if (wd > 0) // L2 regularization, should be done after timing grad_scale - grad += data * wd; - history = history * rho_ + (1 - rho_) * F<op::square>(grad); - tmp = grad * F<op::sqrtop>(update, delta_) / F<op::sqrtop>(history, delta_); - update = rho_ * update + (1 - rho_) * F<op::square>(tmp); - data -= lr * tmp; -} - -/***********************Adam******************************/ -void AdamUpdater::Init(const UpdaterProto &proto) { - Updater::Init(proto); - beta1_=proto.adam_conf().beta1(); - beta2_=proto.adam_conf().beta2(); - delta_ = proto.delta(); -} - -void AdamUpdater::Update(int step, Param* param, float grad_scale) { - Shape<1> s=Shape1(param->size()); - Tensor<cpu, 1> data(param->mutable_cpu_data(), s); - Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s); - Tensor<cpu, 1> history(param->mutable_cpu_history(), s); - Tensor<cpu, 1> update(param->mutable_cpu_update(), s); - float wd = weight_decay_*param->wd_scale(); - float lr = lr_gen_->Get(step) * param->lr_scale(); - grad *= grad_scale; - if (wd > 0) // L2 regularization, should be done after timing grad_scale - grad += data * wd; - history = history * beta1_ + (1 - beta1_) * grad; - update = update * beta2_ + (1 - beta2_) * F<op::square>(grad); - data -= lr * history / F<op::sqrtop>(update, delta_); -} - -/***********************AdamMax******************************/ -void AdamMaxUpdater::Init(const UpdaterProto &proto) { - Updater::Init(proto); - beta1_=proto.adammax_conf().beta1(); - beta2_=proto.adammax_conf().beta2(); - delta_=proto.delta(); -} - -void AdamMaxUpdater::Update(int step, Param* param, float grad_scale) { - Shape<1> s=Shape1(param->size()); - Tensor<cpu, 1> data(param->mutable_cpu_data(), s); - Tensor<cpu, 1> grad(param->mutable_cpu_grad(), s); - Tensor<cpu, 1> history(param->mutable_cpu_history(), s); - Tensor<cpu, 1> update(param->mutable_cpu_update(), s); - float wd = weight_decay_*param->wd_scale(); - float lr = lr_gen_->Get(step) * param->lr_scale(); - grad *= grad_scale; - if (wd > 0) // L2 regularization, should be done after timing grad_scale - grad += data * wd; - history = history * beta1_ + (1 - beta1_) * grad; - update = update * beta2_; - grad = F<op::abs>(grad); - update = F<op::max>(update, grad) + delta_; - data -= lr * history / update; -} - -} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/utils/zk_service.cc ---------------------------------------------------------------------- diff --git a/src/utils/zk_service.cc b/src/utils/zk_service.cc deleted file mode 100644 index 352f6f7..0000000 --- a/src/utils/zk_service.cc +++ /dev/null @@ -1,326 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/utils/zk_service.h" - -#include <glog/logging.h> -#include <algorithm> - -using std::string; -using std::to_string; -using std::vector; - -namespace singa { - -void ZKService::ChildChanges(zhandle_t *zh, int type, int state, - const char *path, void *watcherCtx) { - // check if already callback - RTCallback *cb = static_cast<RTCallback*>(watcherCtx); - if (cb->fn == nullptr) return; - if (type == ZOO_CHILD_EVENT) { - struct String_vector child; - // check the child list and put another watcher - int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child); - if (ret == ZOK) { - if (child.count == 0) { - LOG(INFO) << "child.count = 0 in path: " << path; - // all workers leave, we do callback now - (*cb->fn)(cb->ctx); - cb->fn = nullptr; - } - } else { - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_wget_children " << path << ")"; - } - } else { - LOG(FATAL) << "Unhandled callback type code: "<< type; - } -} - -ZKService::~ZKService() { - // close zookeeper handler - zookeeper_close(zkhandle_); -} - -char zk_cxt[] = "ZKClusterRT"; - -bool ZKService::Init(const string& host, int timeout) { - zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); - zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0, - static_cast<void *>(zk_cxt), 0); - if (zkhandle_ == NULL) { - LOG(ERROR) << "Error when connecting to zookeeper servers..."; - LOG(ERROR) << "Please ensure zookeeper service is up in host(s):"; - LOG(ERROR) << host.c_str(); - return false; - } - - return true; -} - -bool ZKService::CreateNode(const char* path, const char* val, int flag, - char* output) { - CHECK(zkhandle_) << "zk handler not initialized"; - char buf[kZKBufSize]; - int ret = 0; - // send the zk request - for (int i = 0; i < kNumRetry; ++i) { - ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val), - &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize); - if (ret == ZNONODE) { - LOG(WARNING) << "zookeeper parent node of " << path - << " not exist, retry later"; - } else if (ret == ZCONNECTIONLOSS) { - LOG(WARNING) << "zookeeper disconnected, retry later"; - } else { - break; - } - sleep(kSleepSec); - } - // copy the node name to output - if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) { - snprintf(output, kZKBufSize, "%s", buf); - // use snprintf instead of strcpy - // strcpy(output, buf); - } - if (ret == ZOK) { - LOG(INFO) << "created zookeeper node " << buf - << " (" << (val == nullptr ? "NULL" : val) << ")"; - return true; - } else if (ret == ZNODEEXISTS) { - LOG(WARNING) << "zookeeper node " << path << " already exists"; - return true; - } else if (ret == ZCONNECTIONLOSS) { - LOG(ERROR) << "Cannot connect to zookeeper, " - << "please ensure it is running properly...\n" - << "If want to use zookeeper in our thirdparty folder, " - << "you can start it by:\n" - << "$ ./bin/zk-service.sh start"; - return false; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_create " << path << ")"; - return false; -} - -bool ZKService::DeleteNode(const char* path) { - CHECK(zkhandle_) << "zk handler not initialized"; - int ret = zoo_delete(zkhandle_, path, -1); - if (ret == ZOK) { - LOG(INFO) << "deleted zookeeper node " << path; - return true; - } else if (ret == ZNONODE) { - LOG(WARNING) << "try to delete an non-existing zookeeper node " << path; - return true; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_delete " << path << ")"; - return false; -} - -bool ZKService::Exist(const char* path) { - CHECK(zkhandle_) << "zk handler not initialized"; - struct Stat stat; - int ret = zoo_exists(zkhandle_, path, 0, &stat); - if (ret == ZOK) return true; - else if (ret == ZNONODE) return false; - LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)"; - return false; -} - -bool ZKService::UpdateNode(const char* path, const char* val) { - CHECK(zkhandle_) << "zk handler not initialized"; - // set version = -1, do not check content version - int ret = zoo_set(zkhandle_, path, val, strlen(val), -1); - if (ret == ZOK) { - return true; - } else if (ret == ZNONODE) { - LOG(ERROR) << "zk node " << path << " does not exist"; - return false; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get " << path << ")"; - return false; -} - -bool ZKService::GetNode(const char* path, char* output) { - CHECK(zkhandle_) << "zk handler not initialized"; - struct Stat stat; - int val_len = kZKBufSize; - int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat); - if (ret == ZOK) { - output[val_len] = '\0'; - return true; - } else if (ret == ZNONODE) { - LOG(ERROR) << "zk node " << path << " does not exist"; - return false; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get " << path << ")"; - return false; -} - -bool ZKService::GetChild(const char* path, vector<string>* vt) { - CHECK(zkhandle_) << "zk handler not initialized"; - struct String_vector child; - int ret = zoo_get_children(zkhandle_, path, 0, &child); - if (ret == ZOK) { - vt->clear(); - for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]); - return true; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get_children " << path << ")"; - return false; -} - -bool ZKService::WGetChild(const char* path, vector<string>* vt, - RTCallback *cb) { - CHECK(zkhandle_) << "zk handler not initialized"; - struct String_vector child; - int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child); - if (ret == ZOK) { - vt->clear(); - for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]); - return true; - } - LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get_children " << path << ")"; - return false; -} - - -void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state, - const char *path, void *watcherCtx) { - if (type == ZOO_SESSION_EVENT) { - if (state == ZOO_CONNECTED_STATE) - LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!"; - else if (state == ZOO_EXPIRED_SESSION_STATE) - LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!"; - } -} - -ZKClusterRT::ZKClusterRT(const string& host, int job_id) { - host_ = host; - workspace_ = GetZKJobWorkspace(job_id); - group_path_ = workspace_ + kZKPathJobGroup; - proc_path_ = workspace_ + kZKPathJobProc; - proc_lock_path_ = workspace_ + kZKPathJobPLock; -} - -ZKClusterRT::~ZKClusterRT() { - // release callback vector - for (RTCallback* p : cb_vec_) { - delete p; - } -} - -bool ZKClusterRT::Init() { - if (!zk_.Init(host_, timeout_)) return false; - if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr)) - return false; - if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr)) - return false; - return true; -} - -int ZKClusterRT::RegistProc(const string& host_addr, int pid) { - char buf[kZKBufSize]; - string lock = proc_lock_path_ + "/lock-"; - if (!zk_.CreateNode(lock.c_str(), nullptr, - ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { - return -1; - } - // get all children in lock folder - vector<string> vt; - if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) { - return -1; - } - // find own position among all locks - int id = -1; - std::sort(vt.begin(), vt.end()); - for (int i = 0; i < static_cast<int>(vt.size()); ++i) { - if (proc_lock_path_+"/"+vt[i] == buf) { - id = i; - break; - } - } - if (id == -1) { - LOG(ERROR) << "cannot find own node " << buf; - return -1; - } - // create a new node in proc path - string path = proc_path_ + "/proc-" + to_string(id); - string content = host_addr + "|" + to_string(pid); - if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL, - nullptr)) { - return -1; - } - return id; -} - -std::string ZKClusterRT::GetProcHost(int proc_id) { - char val[kZKBufSize]; - // construct file name - string path = proc_path_ + "/proc-" + to_string(proc_id); - if (!zk_.GetNode(path.c_str(), val)) return ""; - int len = strlen(val) - 1; - while (len && val[len] != '|') --len; - CHECK(len); - val[len] = '\0'; - return string(val); -} - -bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) { - CHECK_NOTNULL(fn); - string path = groupPath(gid); - // create zk node - if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false; - vector<string> child; - // store the callback function and context for later usage - RTCallback *cb = new RTCallback; - cb->fn = fn; - cb->ctx = ctx; - cb_vec_.push_back(cb); - // start to watch on the zk node, does not care about the first return value - return zk_.WGetChild(path.c_str(), &child, cb); -} - -bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) { - string path = groupPath(s_group) + workerPath(gid, wid); - // try to create an ephemeral node under server group path - return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr); -} - -bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) { - string path = groupPath(s_group) + workerPath(gid, wid); - return zk_.DeleteNode(path.c_str()); -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/worker.cc ---------------------------------------------------------------------- diff --git a/src/worker.cc b/src/worker.cc deleted file mode 100644 index e92d780..0000000 --- a/src/worker.cc +++ /dev/null @@ -1,545 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/worker.h" - -#include <glog/logging.h> -#include <chrono> -#include <thread> -#include <typeinfo> -#include "singa/utils/cluster.h" -#include "singa/utils/factory.h" -#include "singa/utils/singleton.h" -#include "singa/utils/context.h" -#include "singa/utils/math_blob.h" - -namespace singa { - -using std::string; - -Worker* Worker::CreateWorker(const string str) { - AlgProto alg_proto; - alg_proto.ParseFromString(str); - return Worker::Create(alg_proto); -} - -Worker* Worker::Create(const AlgProto& conf) { - auto factory = Singleton<Factory<singa::Worker>>::Instance(); - Worker* worker = nullptr; - if (conf.has_user_alg()) - worker = factory->Create(conf.user_alg()); - else - worker = factory->Create(conf.alg()); - return worker; -} - -void Worker::Setup(int grp_id, int id, const JobProto& conf, - NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net) { - grp_id_ = grp_id; - id_ = id; - job_conf_ = conf; - train_net_ = train_net; - val_net_ = val_net; - test_net_ = test_net; - InitSockets(train_net); -} - -Worker::~Worker() { - if (dealer_) delete dealer_; - if (bridge_dealer_) delete bridge_dealer_; -} - -void Worker::Run() { - // setup gpu device - auto context = Singleton<Context>::Instance(); - // TODO(wangwei) -2 for uninitial device; -1 for CPU; >=0 for GPU now. - int device = -2; - while (device == -2) { - device = context->device_id(std::this_thread::get_id()); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } - LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") " - << " start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU"); - if (device >= 0) - context->ActivateDevice(device); - - auto cluster = Cluster::Get(); - int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group(); - CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp)); - step_ = job_conf_.step(); - InitSockets(train_net_); - InitNetParams(job_conf_, train_net_); - while (!StopNow(step_)) { - if (ValidateNow(step_) && val_net_ != nullptr) { - CollectAll(step_, train_net_); - LOG(ERROR) << "Validation @ step " + std::to_string(step_); - Test(job_conf_.validate_steps(), kVal, val_net_); - } - if (TestNow(step_) && test_net_ != nullptr) { - CollectAll(step_, train_net_); - LOG(ERROR) << "Test @ step " + std::to_string(step_); - Test(job_conf_.test_steps(), kTest, test_net_); - } - if (CheckpointNow(step_) && grp_id_ == 0) { - CollectAll(step_, train_net_); - Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_); - job_conf_.set_step(step_); - } - TrainOneBatch(step_, train_net_); - if (DisplayNow(step_) && grp_id_ == 0 && id_ == 0) { - Display(kTrain | kForward | kBackward, - "Train @ step " + std::to_string(step_), train_net_); - } - step_++; - } - - // save the model - if (grp_id_ == 0) - Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_); - // clean up - cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp); - // notify the stub on worker stop - Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub)); - msg->set_type(kStop); - dealer_->Send(&msg); // use param dealer to send the stop msg - LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops"; -} - -void Worker::Test(int steps, Phase phase, NeuralNet* net) { - for (int step = 0; step < steps; step++) - TestOneBatch(step, phase, net); - Display(phase, " ", net); -} - -void Worker::InitSockets(const NeuralNet* net) { - dealer_ = new Dealer(Addr(grp_id_, id_, kWorkerParam)); - for (auto layer : net->layers()) { - if (layer->partition_id() == id_) { - if (typeid(*layer) == typeid(BridgeDstLayer) - || typeid(*layer) == typeid(BridgeSrcLayer)) { - bridge_dealer_ = new Dealer(Addr(grp_id_, id_, kWorkerLayer)); - break; - } - } - } - // bind dealer to bridge layers - if (bridge_dealer_ != nullptr) { - for (auto dst : net->layers()) { - if (typeid(*dst) == typeid(BridgeDstLayer)) { - auto src = net->srclayers(dst)[0]; - name2bridge_[src->name()] = src; - name2bridge_[dst->name()] = dst; - if (src->partition_id() == id_) { - dynamic_cast<BridgeLayer*>(src)->MakePaired(dst, grp_id_, - bridge_dealer_, &name2bridge_); - } - if (dst->partition_id() == id_) { - dynamic_cast<BridgeLayer*>(dst)->MakePaired(src, grp_id_, - bridge_dealer_, &name2bridge_); - } - } - } - } -} - -void Worker::InitNetParams(const std::string& folder, vector<Layer*> net) { - - std::unordered_map<string, Param*> name2param; - for (auto layer : net) { - for (auto param : layer->GetParams()) { - // only owners fill the memory of parameter values. - //if (param->owner() == param->id()) { - CHECK(name2param.find(param->name()) == name2param.end()); - name2param[param->name()] = param; - //} - } - } - vector<string> paths; - paths.push_back(folder); - NeuralNet::Load(paths, name2param); -} - -void Worker::InitNetParams(const JobProto& job_conf, NeuralNet* net) { - // for each server grp, its first subscriber worker grp does the param init - if (grp_id_ % Cluster::Get()->nworker_groups_per_server_group() == 0) { - // extract params that should be initialized by this worker - // must gen a name for each param if the user doesn't config it - std::unordered_map<string, Param*> name2param; - for (auto layer : net->layers()) { - if (layer->partition_id() == id_) { - for (auto param : layer->GetParams()) { - // only owners fill the memory of parameter values. - if (param->owner() == param->id()) { - CHECK(name2param.find(param->name()) == name2param.end()); - name2param[param->name()] = param; - } - } - } - } - vector<string> paths; - for (const auto& p : job_conf_.checkpoint_path()) - paths.push_back(p); - net->Load(paths, name2param); - // init other params who do not have checkpoint version - for (auto entry : name2param) { - if (entry.second->version() > 0) { - // if load from pre-training params, reset version to start step - if (job_conf.reset_param_version()) { - entry.second->set_version(job_conf.step()); - } - } else { - entry.second->InitValues(job_conf.step()); - if (!job_conf.reset_param_version()) - LOG(ERROR) << "better reset version of params from checkpoints " - << "to the same as other newly initialized params!"; - } - } - - // warmup training before put params to servers - // for (; step_ < job_conf.warmup_steps(); step_++) - // TrainOneBatch(step_, net); - for (auto layer : net->layers()) { - if (layer->partition_id() == id_) - for (auto param : layer->GetParams()) - if (param->owner() == param->id()) - Put(param->version(), param); - } - } - // wait owners in the same procs init params, then no get requests sent - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - for (auto layer : net->layers()) { - if (layer->partition_id() == id_) - for (auto param : layer->GetParams()) - Get(job_conf.warmup_steps(), param); - } -} - -void Worker::Checkpoint(int step, const std::string& folder, vector<Layer*> net) { - BlobProtos bps; - for (auto layer : net) { - //if (layer->partition_id() == id_) { - for (auto param : layer->GetParams()) { - // only owners fill the memory of parameter values. - //if (param->owner() == param->id()) { - auto *blob = bps.add_blob(); - param->ToProto(blob); - bps.add_version(param->version()); - bps.add_name(param->name()); - //} - } - //} - } - char buf[256]; - snprintf(buf, sizeof(buf), "%s/step%d-worker0", folder.c_str(), step); - LOG(INFO) << "checkpoint to " << buf; - WriteProtoToBinaryFile(bps, buf); -} - -void Worker::Checkpoint(int step, const std::string& folder, NeuralNet* net) { - BlobProtos bps; - for (auto layer : net->layers()) { - if (layer->partition_id() == id_) { - for (auto param : layer->GetParams()) { - // only owners fill the memory of parameter values. - if (param->owner() == param->id()) { - auto *blob = bps.add_blob(); - param->ToProto(blob); - bps.add_version(param->version()); - bps.add_name(param->name()); - } - } - } - } - char buf[256]; - snprintf(buf, sizeof(buf), "%s/step%d-worker%d", folder.c_str(), step, id_); - LOG(INFO) << "checkpoint to " << buf; - WriteProtoToBinaryFile(bps, buf); -} - -int Worker::Put(int step, Param* param) { - if (dealer_ == nullptr) { - LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")"; - return 1; - } - // set Blob head to cpu to avoid calling cudaMemcpy by the stub thread, which - // would hang on some machines. - param->data().cpu_data(); - Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub)); - msg->set_trgt(ParamTrgt(param->owner(), 0), step); - msg->set_type(kPut); - dealer_->Send(&msg); -// LOG(ERROR) << "worker msg " << msg; - return 1; -} - -int Worker::Get(int step, Param* param) { - if (param->version() >= step) - return 1; - if (dealer_ == nullptr) { - LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")"; - return 1; - } - // set Blob head to cpu to avoid calling cudaMemcpy by the stub thread, which - // would hang on some machines. - param->mutable_data()->mutable_cpu_data(); - - Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub)); - msg->set_trgt(ParamTrgt(param->owner(), 0), step); - msg->set_type(kGet); - dealer_->Send(&msg); - return 1; -} - -int Worker::Update(int step, Param* param) { - param->set_last_version(param->version()); - if (dealer_ == nullptr) { - LOG(WARNING) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")"; - return 1; - } - // head of data Blob (SyncMem) to cpu, because the stub thread may use - // cudaMemcpy copy gradients into msgs. cudaMemcpy hangs when called by the - // stub thread on some GPU machines. - // TODO(wangwei) fix this issue and remove the following line. - // optimize for training with single worker by removing stub and server, and - // updating parameters locally inside the worker GPU. Then we do not need to - // transfer gradients and parameter values between GPU-CPU. - param->grad().cpu_data(); - // change the head of SyncMem to cpu; otherwise, the updated parameter - // values would not be synced to gpu (since the head is at gpu). - param->mutable_data()->mutable_cpu_data(); - - Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub)); - msg->set_trgt(ParamTrgt(param->owner(), 0), step); - msg->set_type(kUpdate); - dealer_->Send(&msg); - return 1; -} - -int Worker::CollectAll(int step, NeuralNet* net) { - auto& layers = net->layers(); - for (auto& layer : layers) { - if (layer->partition_id() == id_) { - for (Param* p : layer->GetParams()) { - Collect(step, p); - } - } - } - return 1; -} - -int Worker::Collect(int step, Param* param) { - while (param->version() <= param->last_version()) { - std::this_thread::sleep_for(std::chrono::milliseconds(kCollectSleepTime)); - // LOG(ERROR) << "wait "<< param->id() << " at " << step << " by " <<id_; - } - return 1; -} - -void Worker::Display(int flag, const std::string& prefix, NeuralNet* net) { - for (auto layer : net->layers()) { - if (layer->partition_id() == id_) { - const string& disp = layer->ToString(false, flag); - if (disp.length()) - LOG(ERROR) << prefix << " " << disp; - } - } -} - -/****************************BPWorker**********************************/ -void BPWorker::TrainOneBatch(int step, NeuralNet* net) { - Forward(step, kTrain, net); - Backward(step, net); -} - -void BPWorker::TestOneBatch(int step, Phase phase, NeuralNet* net) { - Forward(step, phase, net); -} - -void BPWorker::Forward(int step, Phase phase, NeuralNet* net) { - map<string, string> label; - for (auto& layer : net->layers()) { - if (layer->partition_id() == id_) { - if (phase == kTrain && layer->unroll_index() == 0) { - // wait until param is updated - for (Param* p : layer->GetParams()) { - Collect(step, p); - } - } - // DLOG(ERROR) << "Forward " << layer->name(); - layer->ComputeFeature(phase | kForward, net->srclayers(layer)); - if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0) - label[layer->name()] = layer->ToString(true, phase | kForward); - } - } - if (label.size()) { - const string path = Cluster::Get()->vis_folder() + "/fp-step" - + std::to_string(step) +"-loc" + std::to_string(id_) + ".json"; - WriteStringToTextFile(path, net->ToGraph(false).ToJson(label)); - } -} - -void BPWorker::Backward(int step, NeuralNet* net) { - map<string, string> label; - auto& layers = net->layers(); - for (auto it = layers.rbegin(); it != layers.rend(); it++) { - Layer* layer = *it; - if (layer->partition_id() == id_) { - layer->ComputeGradient(kTrain | kBackward, net->srclayers(layer)); - if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0) - label[layer->name()] = layer->ToString(true, kTrain | kBackward); - for (Param* p : layer->GetParams()) - Update(step, p); - } - } - if (label.size()) { - const string path = Cluster::Get()->vis_folder() + "/bp-step" - + std::to_string(step) + "-loc" + std::to_string(id_) + ".json"; - WriteStringToTextFile(path, net->ToGraph(false).Reverse().ToJson(label)); - } -} - -/***************************BPTTWorker*********************************/ -void BPTTWorker::Forward(int step, Phase phase, NeuralNet* net) { - map<string, string> label; - for (auto& layer : net->layers()) { - if (layer->partition_id() == id_) { - if (phase == kTrain && layer->unroll_index() == 0) { - // wait until param is updated - for (Param* p : layer->GetParams()) { - Collect(step, p); - Zero(p->mutable_grad()); - } - } - vector<Layer*> src = net->srclayers(layer); - if ((phase & kTest) && typeid(*layer) == typeid(RNNDummyLayer)) { - CHECK_LE(src.size(), 1); - auto dummy = dynamic_cast<RNNDummyLayer*>(layer); - Layer* srclayer = net->name2layer(dummy->srclayer(step)); - if (step > 0) - CHECK(srclayer != nullptr); - if (srclayer != nullptr) { - src.clear(); - src.push_back(srclayer); - } - } - // if full state rnn and not the starting of a new passing of the dataset, - // feed the hidden state of the last unit to the first unit. - if (layer->unroll_index() == 0 && full_state_ && !begin_) { - Layer* last = net->last_unroll_layer(layer); - CHECK(last != nullptr); - if (last != layer || (phase & kTest)) - src.push_back(last); - } - // LOG(ERROR) << layer->name() << " forward"; - // int ret = - layer->ComputeFeature(phase | kForward, src); - /* - if ((phase & Phase::kTrain) && ret == Status::kEnd) - begin_ = true; - */ - if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0) - label[layer->name()] = layer->ToString(true, phase | kForward); - } - } - if (label.size()) { - const string path = Cluster::Get()->vis_folder() + "/fp-step" - + std::to_string(step) +"-loc" + std::to_string(id_) + ".json"; - WriteStringToTextFile(path, net->ToGraph(false).ToJson(label)); - } -} - -void BPTTWorker::Backward(int step, NeuralNet* net) { - map<string, string> label; - auto& layers = net->layers(); - for (auto it = layers.rbegin(); it != layers.rend(); it++) { - Layer* layer = *it; - if (layer->partition_id() == id_) { - layer->ComputeGradient(kTrain | kBackward | kAggGrad, - net->srclayers(layer)); - // LOG(ERROR) << layer->name() << " backward"; - if (job_conf_.debug() && DisplayNow(step) && grp_id_ == 0) - label[layer->name()] = layer->ToString(true, kTrain | kBackward); - // unrolled layers share parameter data and grad, just update the 1st one - if (layer->unroll_index() == 0) - for (Param* p : layer->GetParams()) - Update(step, p); - } - } - if (label.size()) { - const string path = Cluster::Get()->vis_folder() + "/bp-step" - + std::to_string(step) + "-loc" + std::to_string(id_) + ".json"; - WriteStringToTextFile(path, net->ToGraph(false).Reverse().ToJson(label)); - } -} -void BPTTWorker::Display(int flag, const std::string& prefix, NeuralNet* net) { - std::unordered_map<string, float> perf; - for (auto layer : net->layers()) { - if (layer->partition_id() == id_) { - const string& disp = layer->ToString(false, flag); - for (const auto& entry : GetMetricFromString(disp)) - perf[entry.first] += entry.second; - } - } - string disp = prefix + " "; - for (const auto& entry : perf) - disp += entry.first + " = " + std::to_string(entry.second) + ", "; - LOG(ERROR) << disp; -} -/****************************CDWorker**********************************/ -void CDWorker::TrainOneBatch(int step, NeuralNet* net) { - const auto& layers = net->layers(); - for (auto* layer : layers) { - for (Param* p : layer->GetParams()) // wait until param is updated - Collect(step, p); - layer->ComputeFeature(kPositive, net->srclayers(layer)); - } - for (auto* layer : layers) - if (typeid(*layer) == typeid(RBMVisLayer) - || typeid(*layer) == typeid(RBMHidLayer)) - layer->ComputeFeature(kNegative | kTest, net->srclayers(layer)); - for (int i = 1; i < job_conf_.train_one_batch().cd_conf().cd_k(); i++) { - for (auto* layer : layers) { - if (typeid(*layer) == typeid(RBMVisLayer) - || typeid(*layer) == typeid(RBMHidLayer)) - layer->ComputeFeature(kNegative, net->srclayers(layer)); - } - } - for (auto* layer : layers) { - if (typeid(*layer) == typeid(RBMVisLayer) - || typeid(*layer) == typeid(RBMHidLayer)) { - layer->ComputeGradient(kTrain, net->srclayers(layer)); - for (Param* p : layer->GetParams()) { - Update(step, p); - } - } - } -} - -void CDWorker::TestOneBatch(int step, Phase phase, NeuralNet* net) { - auto& layers = net->layers(); - for (auto *layer : layers) - layer->ComputeFeature(kPositive, net->srclayers(layer)); - for (auto *layer : layers) - if (typeid(*layer) == typeid(RBMVisLayer)) - layer->ComputeFeature(kNegative | kTest, net->srclayers(layer)); -} - -} // namespace singa
