SINGA-109 Refine bridge layers re-implement bridge layers for model partition * move socket operations for sending/receiving blobs from worker into layers, so that it is transparent to users who will implment TrainOneBatch * when initialing worker, it will create a socket instance and pass it to all bridge layers using layer.MakePaired() function
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/bd2e3453 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/bd2e3453 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/bd2e3453 Branch: refs/heads/master Commit: bd2e3453ca01e7bf9bf6724b4717d956c6e00290 Parents: 4664b6b Author: WANG Sheng <[email protected]> Authored: Wed Dec 2 11:27:36 2015 +0800 Committer: WANG Sheng <[email protected]> Committed: Fri Dec 4 18:13:11 2015 +0800 ---------------------------------------------------------------------- .../singa/neuralnet/connection_layer/bridge.h | 84 +++----- include/singa/worker.h | 38 ++-- src/neuralnet/connection_layer/bridge.cc | 78 +++++++- src/neuralnet/neuron_layer/dummy.cc | 2 +- src/test/test_connection_layers.cc | 88 +++++++- src/worker.cc | 200 +++++++++---------- 6 files changed, 298 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/include/singa/neuralnet/connection_layer/bridge.h ---------------------------------------------------------------------- diff --git a/include/singa/neuralnet/connection_layer/bridge.h b/include/singa/neuralnet/connection_layer/bridge.h index 5f224b3..b27693d 100644 --- a/include/singa/neuralnet/connection_layer/bridge.h +++ b/include/singa/neuralnet/connection_layer/bridge.h @@ -22,79 +22,55 @@ #ifndef SINGA_NEURALNET_CONNECTION_LAYER_BRIDGE_H_ #define SINGA_NEURALNET_CONNECTION_LAYER_BRIDGE_H_ +#include <string> +#include <unordered_map> #include <vector> +#include "singa/comm/socket.h" #include "singa/neuralnet/layer.h" namespace singa { -class BridgeLayer : virtual public ConnectionLayer { + +class BridgeLayer : public ConnectionLayer { public: - void set_ready(bool a) { - ready_ = a; - } - bool ready() const { - return ready_; - } - virtual bool is_bridgesrclayer() const { - return false; - } - virtual bool is_bridgedstlayer() const { - return false; - } + void set_ready(bool a) { ready_ = a; } + bool ready() const { return ready_; } + // Bind the layer with dealer instance by worker at runtime + void MakePaired(Layer* pair, int grp_id, Dealer* dealer, + std::unordered_map<std::string, Layer*>* name2bridge); + // Send blobs to other workers due to model partitions + void SendBlobs(bool handle_data); + // Receive blobs from other workers due to model partitions; + void ReceiveBlobs(bool handle_data); protected: //!< true if received grad from BridgeDstLayer - bool ready_; + bool ready_ = false; + int group_id_ = 0; + Layer* pair_ = nullptr; + Dealer* dealer_ = nullptr; + std::unordered_map<std::string, Layer*>* name2bridge_ = nullptr; }; /** - * For recv data from layer on other threads which may resident on other nodes - * due to layer/data partiton + * For sending data to layer on other threads which may resident on other nodes + * due to layer/data partition. */ -class BridgeDstLayer : public BridgeLayer { +class BridgeSrcLayer : public BridgeLayer { public: void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; - void ComputeFeature(int flag, const vector<Layer*>& srclayers) override { - // reset ready_ for next iteration. - ready_ = false; - } - void ComputeGradient(int flag, const vector<Layer*>& srclayers) override {} - bool is_bridgedstlayer() const { - return true; - } + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; }; /** - * For sending data to layer on other threads which may resident on other nodes - * due to layer/data partition. + * For recv data from layer on other threads which may resident on other nodes + * due to layer/data partiton */ -class BridgeSrcLayer : public BridgeLayer { +class BridgeDstLayer : public BridgeLayer { public: - void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override { - CHECK_GE(srclayers.size(), 1); - srclayer_ = srclayers.at(0); - } - void ComputeFeature(int flag, const vector<Layer*>& srclayers) override {} - void ComputeGradient(int flag, const vector<Layer*>& srclayers) override { - ready_ = false; - } - const Blob<float>& data(const Layer* from) const override { - return srclayer_->data(this); - } - Blob<float>* mutable_data(const Layer* from) override { - return srclayer_->mutable_data(this); - } - const Blob<float>& grad(const Layer* from) const override { - return srclayer_->grad(this); - } - Blob<float>* mutable_grad(const Layer* from) override { - return srclayer_->mutable_grad(this); - } - bool is_bridgesrclayer() const override { - return true; - } - - private: - Layer* srclayer_; + void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override; + void ComputeFeature(int flag, const vector<Layer*>& srclayers) override; + void ComputeGradient(int flag, const vector<Layer*>& srclayers) override; }; } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/include/singa/worker.h ---------------------------------------------------------------------- diff --git a/include/singa/worker.h b/include/singa/worker.h index 1ed3642..53f2118 100644 --- a/include/singa/worker.h +++ b/include/singa/worker.h @@ -23,6 +23,7 @@ #define SINGA_WORKER_H_ #include <string> +#include <unordered_map> #include <vector> #include "singa/comm/socket.h" #include "singa/neuralnet/neuralnet.h" @@ -78,7 +79,6 @@ class Worker { */ virtual void Setup(int grp_id, int id, const JobProto& conf, NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net); - /** * Main function of Worker. * @@ -94,6 +94,17 @@ class Worker { */ void Test(int steps, Phase phase, NeuralNet* net); /** + * Init sockets in a worker, including: + * 1. a global socket communicates with stub + * 2. a bridge socket dedicated for bridge layer communications + * + * the bridge socket will be binded to each bridge layer + * + * @param[in] net pointer to a neural net whose bridge layer will be binded + * with a socket. + */ + void InitSockets(const NeuralNet* net); + /** * Init values of Param instances assocaited with local layers (i.e., layers * dispatched to this worker). * @@ -118,7 +129,6 @@ class Worker { * initialized. */ void InitNetParams(const JobProto& job_conf, NeuralNet* net); - /** * Checkpoint all Param objects owned by the worker onto disk. * The serialization is done using BlobProtos which includes the name, version @@ -130,7 +140,6 @@ class Worker { * @param net the training net whose Param objects will be dumped. */ void Checkpoint(int step, const std::string& folder, NeuralNet* net); - /** * Train one mini-batch. * Test/Validation is done before training. @@ -139,7 +148,6 @@ class Worker { * @param[in] net neural net to be trained. */ virtual void TrainOneBatch(int step, NeuralNet* net) = 0; - /** * Test/validate one mini-batch data. * @@ -148,7 +156,6 @@ class Worker { * @param[in] net neural net for test */ virtual void TestOneBatch(int step, Phase phase, NeuralNet* net) = 0; - /** * Display infomation from layers. * @@ -159,7 +166,6 @@ class Worker { * @param net display layers from this neural net. */ void Display(int flag, const std::string& prefix, NeuralNet* net); - /** * Put Param values to server. * @@ -167,7 +173,6 @@ class Worker { * @param step used as current param version for the put request */ int Put(int step, Param* param); - /** * Get Param with specific version from server * If the current version >= the requested version, then return. @@ -176,7 +181,6 @@ class Worker { * @param step requested param version */ int Get(int step, Param* param); - /** * Update Param. * @@ -184,7 +188,6 @@ class Worker { * @param step training step used for updating (e.g., deciding learning rate). */ int Update(int step, Param* param); - /** * Wait for the response of the update/get requests. * @@ -192,23 +195,10 @@ class Worker { * @param step not used now. */ int Collect(int step, Param* param); - /** * Call Collect() for every param of net */ int CollectAll(int step, NeuralNet* net); - - /** - * Receive blobs from other workers due to model partitions. - */ - void ReceiveBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net); - - /** - * Send blobs to other workers due to model partitions. - */ - void SendBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net); - - /** * @param[in] step * @return true if it is time to display training info, e.g., loss; otherwise @@ -284,8 +274,10 @@ class Worker { NeuralNet* train_net_ = nullptr; NeuralNet* test_net_ = nullptr; NeuralNet* val_net_ = nullptr; - Dealer* layer_dealer_ = nullptr; Dealer* dealer_ = nullptr; + // bridge layer related + Dealer* bridge_dealer_ = nullptr; + std::unordered_map<std::string, Layer*> name2bridge_; }; class BPWorker: public Worker { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/src/neuralnet/connection_layer/bridge.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/connection_layer/bridge.cc b/src/neuralnet/connection_layer/bridge.cc index 5a43c20..1ad4b0c 100644 --- a/src/neuralnet/connection_layer/bridge.cc +++ b/src/neuralnet/connection_layer/bridge.cc @@ -20,15 +20,89 @@ *************************************************************/ #include "singa/neuralnet/connection_layer/bridge.h" +#include "singa/comm/msg.h" namespace singa { using std::vector; -void BridgeDstLayer::Setup(const LayerProto& proto, + +void BridgeLayer::MakePaired(Layer* pair, int grp_id, Dealer* dealer, + std::unordered_map<std::string, Layer*>* name2bridge) { + pair_ = pair; + group_id_ = grp_id; + dealer_ = dealer; + name2bridge_ = name2bridge; +} + +void BridgeLayer::SendBlobs(bool handle_data) { + CHECK(dealer_) << "NULL dealer for bridges in worker (" << group_id_ + << ", " << partition_id() << ")"; + Msg *msg = new Msg(); + msg->set_src(Addr(group_id_, partition_id(), kWorkerLayer)); + msg->set_dst(Addr(group_id_, pair_->partition_id(), kWorkerLayer)); + msg->AddFrame(pair_->name().c_str(), pair_->name().length()); + auto const& blob = handle_data ? data(nullptr) : grad(nullptr); + msg->AddFrame(blob.cpu_data(), blob.count() * sizeof(float)); + dealer_->Send(&msg); +} + +void BridgeLayer::ReceiveBlobs(bool handle_data) { + CHECK(dealer_) << "NULL dealer for bridges in worker (" << group_id_ + << ", " << partition_id() << ")"; + while (!ready()) { + auto msg = dealer_->Receive(); + CHECK_EQ(AddrGrp(msg->src()), group_id_); + string name(static_cast<char*>(msg->FrameData()), msg->FrameSize()); + auto receive_layer = name2bridge_->at(name); + auto blob = handle_data ? receive_layer->mutable_data(nullptr) : + receive_layer -> mutable_grad(nullptr); + msg->NextFrame(); + memcpy(blob->mutable_cpu_data(), msg->FrameData(), msg->FrameSize()); + dynamic_cast<BridgeLayer*>(receive_layer)->set_ready(true); + delete msg; + } +} + +void BridgeSrcLayer::Setup(const LayerProto& conf, + const vector<Layer*>& srclayers) { + CHECK_GE(srclayers.size(), 1); + Layer::Setup(conf, srclayers); + data_.Reshape(srclayers[0]->data(this).shape()); + grad_.ReshapeLike(data_); + data_.ShareData(srclayers[0]->data(this)); + grad_.ShareData(srclayers[0]->grad(this)); +} + +void BridgeSrcLayer::ComputeFeature(int flag, const vector<Layer*>& srcs) { + // send data + SendBlobs(true); + // reset flag for receiving gradient in compute gradient phase + set_ready(false); +} + +void BridgeSrcLayer::ComputeGradient(int flag, const vector<Layer*>& srcs) { + // receive gradient + ReceiveBlobs(false); +} + +void BridgeDstLayer::Setup(const LayerProto& conf, const vector<Layer*>& srclayers) { - Layer::Setup(proto, srclayers); CHECK_EQ(srclayers.size(), 1); + Layer::Setup(conf, srclayers); data_.Reshape(srclayers[0]->data(this).shape()); grad_.ReshapeLike(data_); } + +void BridgeDstLayer::ComputeFeature(int flag, const vector<Layer*>& srcs) { + // receive data + ReceiveBlobs(true); +} + +void BridgeDstLayer::ComputeGradient(int flag, const vector<Layer*>& srcs) { + // send gradient + SendBlobs(false); + // reset flag for receiving data in compute feature phase + set_ready(false); +} + } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/src/neuralnet/neuron_layer/dummy.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/neuron_layer/dummy.cc b/src/neuralnet/neuron_layer/dummy.cc index 2ef702d..8d165f7 100644 --- a/src/neuralnet/neuron_layer/dummy.cc +++ b/src/neuralnet/neuron_layer/dummy.cc @@ -69,4 +69,4 @@ void DummyLayer::ComputeGradient(int flag, const vector<Layer*>& srclayers) { } } -} // namespace singa +} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/src/test/test_connection_layers.cc ---------------------------------------------------------------------- diff --git a/src/test/test_connection_layers.cc b/src/test/test_connection_layers.cc index 3d931b3..837a941 100644 --- a/src/test/test_connection_layers.cc +++ b/src/test/test_connection_layers.cc @@ -19,7 +19,12 @@ * *************************************************************/ +#include <string> +#include <unordered_map> +#include <vector> #include "gtest/gtest.h" +#include "singa/comm/msg.h" +#include "singa/comm/socket.h" #include "singa/neuralnet/connection_layer/bridge.h" #include "singa/neuralnet/neuron_layer/dummy.h" #include "singa/proto/job.pb.h" @@ -28,8 +33,8 @@ using namespace singa; TEST(ConnectionLayerTest, DummyTest) { // use dummy as input layer - LayerProto proto_in; vector<Layer*> src_in; + LayerProto proto_in; proto_in.set_name("dummy_input"); proto_in.mutable_dummy_conf()->set_input(true); proto_in.mutable_dummy_conf()->add_shape(10); @@ -39,11 +44,11 @@ TEST(ConnectionLayerTest, DummyTest) { ASSERT_EQ(in.data(nullptr).shape(0), 10); ASSERT_EQ(in.data(nullptr).shape(1), 20); in.ComputeFeature(0, src_in); - + // use dummy as neuron layer - LayerProto proto_neu; vector<Layer*> src_neu; src_neu.push_back(static_cast<Layer*>(&in)); + LayerProto proto_neu; proto_neu.set_name("dummy_neuron"); proto_neu.mutable_dummy_conf(); DummyLayer neu; @@ -56,9 +61,9 @@ TEST(ConnectionLayerTest, DummyTest) { ASSERT_EQ(in.data(nullptr).cpu_data()[i], neu.data(nullptr).cpu_data()[i]); // use dummy as output layer - LayerProto proto_out; vector<Layer*> src_out; src_out.push_back(static_cast<Layer*>(&neu)); + LayerProto proto_out; proto_out.set_name("dummy_output"); proto_out.mutable_dummy_conf()->set_output(true); DummyLayer out; @@ -69,7 +74,7 @@ TEST(ConnectionLayerTest, DummyTest) { ASSERT_EQ(in.data(nullptr).count(), out.data(nullptr).count()); for (int i = 0; i < in.data(nullptr).count(); ++i) ASSERT_EQ(in.data(nullptr).cpu_data()[i], out.data(nullptr).cpu_data()[i]); - + // test for computing gradient out.ComputeGradient(0, src_out); neu.ComputeGradient(0, src_neu); @@ -77,3 +82,76 @@ TEST(ConnectionLayerTest, DummyTest) { for (int i = 0; i < in.grad(nullptr).count(); ++i) ASSERT_EQ(in.grad(nullptr).cpu_data()[i], out.grad(nullptr).cpu_data()[i]); } + + +TEST(ConnectionLayerTest, BridgeTest) { + // use dummy as input layer + vector<Layer*> src_in; + LayerProto proto_in; + proto_in.set_name("dummy_input"); + proto_in.mutable_dummy_conf()->set_input(true); + proto_in.mutable_dummy_conf()->add_shape(10); + proto_in.mutable_dummy_conf()->add_shape(20); + DummyLayer in; + in.Setup(proto_in, src_in); + + // add src bridge layer + vector<Layer*> src_src; + src_src.push_back(static_cast<Layer*>(&in)); + LayerProto proto_src; + proto_in.set_name("bridge_src"); + BridgeSrcLayer src; + src.Setup(proto_src, src_src); + ASSERT_EQ(src.data(nullptr).shape(0), 10); + ASSERT_EQ(src.data(nullptr).shape(1), 20); + + // add dst bridge layer + vector<Layer*> src_dst; + src_dst.push_back(static_cast<Layer*>(&src)); + LayerProto proto_dst; + proto_dst.set_name("bridge_dst"); + BridgeDstLayer dst; + dst.Setup(proto_dst, src_dst); + ASSERT_EQ(dst.data(nullptr).shape(0), 10); + ASSERT_EQ(dst.data(nullptr).shape(1), 20); + + // bind bridges to socket + Router router(10); + router.Bind("inproc://router"); + Dealer dealer(0); + dealer.Connect("inproc://router"); + std::unordered_map<std::string, Layer*> name2bridge; + name2bridge[src.name()] = &src; + name2bridge[dst.name()] = &dst; + src.MakePaired(static_cast<Layer*>(&dst), 0, &dealer, &name2bridge); + dst.MakePaired(static_cast<Layer*>(&src), 0, &dealer, &name2bridge); + + // use dummy as output layer + LayerProto proto_out; + vector<Layer*> src_out; + src_out.push_back(static_cast<Layer*>(&dst)); + proto_out.set_name("dummy_output"); + proto_out.mutable_dummy_conf()->set_output(true); + DummyLayer out; + out.Setup(proto_out, src_out); + + // test for computing feature + in.ComputeFeature(0, src_in); + src.ComputeFeature(0, src_src); + Msg* msg_data = router.Receive(); + router.Send(&msg_data); + dst.ComputeFeature(0, src_dst); + out.ComputeFeature(0, src_out); + for (int i = 0; i < in.data(nullptr).count(); ++i) + ASSERT_EQ(in.data(nullptr).cpu_data()[i], out.data(nullptr).cpu_data()[i]); + + // test for computing gradient + out.ComputeGradient(0, src_out); + dst.ComputeGradient(0, src_dst); + Msg* msg_grad = router.Receive(); + router.Send(&msg_grad); + src.ComputeGradient(0, src_src); + in.ComputeGradient(0, src_in); + for (int i = 0; i < in.grad(nullptr).count(); ++i) + ASSERT_EQ(in.grad(nullptr).cpu_data()[i], out.grad(nullptr).cpu_data()[i]); +} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/src/worker.cc ---------------------------------------------------------------------- diff --git a/src/worker.cc b/src/worker.cc index e6c2279..4756514 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -51,14 +51,102 @@ void Worker::Setup(int grp_id, int id, const JobProto& conf, train_net_ = train_net; val_net_ = val_net; test_net_ = test_net; - layer_dealer_ = dealer_ = nullptr; + bridge_dealer_ = dealer_ = nullptr; } Worker::~Worker() { - if (layer_dealer_) - delete layer_dealer_; - if (dealer_) - delete dealer_; + if (dealer_) delete dealer_; + if (bridge_dealer_) delete bridge_dealer_; +} + +void Worker::Run() { + LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") start"; + auto cluster = Cluster::Get(); + int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group(); + CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp)); + step_ = job_conf_.step(); + InitSockets(train_net_); + InitNetParams(job_conf_, train_net_); + while (!StopNow(step_)) { + if (ValidateNow(step_) && val_net_ != nullptr) { + CollectAll(step_, val_net_); + LOG(ERROR) << "Validation @ step " + std::to_string(step_); + Test(job_conf_.validate_steps(), kVal, val_net_); + } + if (TestNow(step_) && test_net_ != nullptr) { + CollectAll(step_, test_net_); + LOG(ERROR) << "Test @ step " + std::to_string(step_); + Test(job_conf_.test_steps(), kTest, test_net_); + } + if (CheckpointNow(step_) && grp_id_ == 0) { + CollectAll(step_, train_net_); + Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_); + job_conf_.set_step(step_); + } + TrainOneBatch(step_, train_net_); + if (DisplayNow(step_) && grp_id_ == 0 && id_ == 0) + Display(kTrain, "Train @ step " + std::to_string(step_), train_net_); + step_++; + } + + // save the model + if (grp_id_ == 0) + Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_); + // clean up + cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp); + // notify the stub on worker stop + Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub)); + msg->set_type(kStop); + dealer_->Send(&msg); // use param dealer to send the stop msg + LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops"; +} + +void Worker::Test(int steps, Phase phase, NeuralNet* net) { + for (int step = 0; step < steps; step++) + TestOneBatch(step, phase, net); + Display(phase, " ", net); +} + +void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) { + dealer->Connect(kInprocRouterEndpoint); + Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub)); + ping->set_type(kConnect); + dealer->Send(&ping); +} + +void Worker::InitSockets(const NeuralNet* net) { + // TODO(wangsh): provide a unique sock id from cluster + dealer_ = new Dealer(0); + ConnectStub(grp_id_, id_, dealer_, kWorkerParam); + for (auto layer : net->layers()) { + if (layer->partition_id() == id_) { + if (typeid(layer) == typeid(BridgeDstLayer) + || typeid(layer) == typeid(BridgeSrcLayer)) { + // TODO(wangsh): provide a unique socket id from cluster + bridge_dealer_ = new Dealer(1); + ConnectStub(grp_id_, id_, bridge_dealer_, kWorkerLayer); + break; + } + } + } + // bind dealer to bridge layers + if (bridge_dealer_ != nullptr) { + for (auto dst : net->layers()) { + if (typeid(dst) == typeid(BridgeDstLayer)) { + auto src = net->srclayers(dst)[0]; + name2bridge_[src->name()] = src; + name2bridge_[dst->name()] = dst; + if (src->partition_id() == id_) { + dynamic_cast<BridgeLayer*>(src)->MakePaired(dst, grp_id_, + bridge_dealer_, &name2bridge_); + } + if (dst->partition_id() == id_) { + dynamic_cast<BridgeLayer*>(dst)->MakePaired(src, grp_id_, + bridge_dealer_, &name2bridge_); + } + } + } + } } void Worker::InitNetParams(const JobProto& job_conf, NeuralNet* net) { @@ -116,75 +204,6 @@ void Worker::InitNetParams(const JobProto& job_conf, NeuralNet* net) { } } -void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) { - dealer->Connect(kInprocRouterEndpoint); - Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub)); - ping->set_type(kConnect); - dealer->Send(&ping); -} - -void Worker::Test(int steps, Phase phase, NeuralNet* net) { - for (int step = 0; step < steps; step++) - TestOneBatch(step, phase, net); - Display(phase, " ", net); -} - -void Worker::Run() { - LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") start"; - auto cluster = Cluster::Get(); - int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group(); - CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp)); - // TODO(wangsh): provide a unique sock id from cluster - dealer_ = new Dealer(0); - ConnectStub(grp_id_, id_, dealer_, kWorkerParam); - for (auto layer : train_net_->layers()) { - if (layer->partition_id() == id_) { - if (typeid(layer) == typeid(BridgeDstLayer) - || typeid(layer) == typeid(BridgeSrcLayer)) { - // TODO(wangsh): provide a unique socket id from cluster - layer_dealer_ = new Dealer(1); - ConnectStub(grp_id_, id_, layer_dealer_, kWorkerLayer); - break; - } - } - } - - step_ = job_conf_.step(); - InitNetParams(job_conf_, train_net_); - while (!StopNow(step_)) { - if (ValidateNow(step_) && val_net_ != nullptr) { - CollectAll(step_, val_net_); - LOG(ERROR) << "Validation @ step " + std::to_string(step_); - Test(job_conf_.validate_steps(), kVal, val_net_); - } - if (TestNow(step_) && test_net_ != nullptr) { - CollectAll(step_, test_net_); - LOG(ERROR) << "Test @ step " + std::to_string(step_); - Test(job_conf_.test_steps(), kTest, test_net_); - } - if (CheckpointNow(step_) && grp_id_ == 0) { - CollectAll(step_, train_net_); - Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_); - job_conf_.set_step(step_); - } - TrainOneBatch(step_, train_net_); - if (DisplayNow(step_) && grp_id_ == 0 && id_ == 0) - Display(kTrain, "Train @ step " + std::to_string(step_), train_net_); - step_++; - } - - // save the model - if (grp_id_ == 0) - Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_); - // clean up - cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp); - // notify the stub on worker stop - Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub)); - msg->set_type(kStop); - dealer_->Send(&msg); // use param dealer to send the stop msg - LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops"; -} - void Worker::Checkpoint(int step, const std::string& folder, NeuralNet* net) { BlobProtos bps; for (auto layer : net->layers()) { @@ -279,39 +298,6 @@ void Worker::Display(int flag, const std::string& prefix, NeuralNet* net) { } } -void Worker::ReceiveBlobs(bool data, bool grad, BridgeLayer* layer, - NeuralNet* net) { - if (layer_dealer_ == nullptr) { - LOG(ERROR) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")"; - } - while (!layer->ready()) { - auto msg = layer_dealer_->Receive(); - CHECK_EQ(AddrGrp(msg->src()), grp_id_); - string name(static_cast<char*>(msg->FrameData()), msg->FrameSize()); - auto receive_layer = net->name2layer(name); - auto data = receive_layer->mutable_data(nullptr); - msg->NextFrame(); - memcpy(data->mutable_cpu_data(), msg->FrameData(), msg->FrameSize()); - dynamic_cast<BridgeLayer*>(receive_layer)->set_ready(true); - delete msg; - } -} - -void Worker::SendBlobs(bool data, bool grad, BridgeLayer* layer, - NeuralNet* net) { - if (layer_dealer_ == nullptr) { - LOG(ERROR) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")"; - } - auto dst = net->srclayers(layer).at(0); - Msg *msg = new Msg(); - msg->set_src(Addr(grp_id_, id_, kWorkerLayer)); - msg->set_dst(Addr(grp_id_, dst->partition_id(), kWorkerLayer)); - msg->AddFrame(dst->name().c_str(), dst->name().length()); - auto const & blob = layer->data(nullptr); - msg->AddFrame(blob.cpu_data(), blob.count() * sizeof(float)); - layer_dealer_->Send(&msg); -} - /****************************BPWorker**********************************/ void BPWorker::TrainOneBatch(int step, NeuralNet* net) { Forward(step, kTrain, net);
