SINGA-109 Refine bridge layers

re-implement bridge layers for model partition
 * move socket operations for sending/receiving blobs from worker into layers,
   so that it is transparent to users who will implment TrainOneBatch
 * when initialing worker, it will create a socket instance and pass it to all
   bridge layers using layer.MakePaired() function


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/bd2e3453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/bd2e3453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/bd2e3453

Branch: refs/heads/master
Commit: bd2e3453ca01e7bf9bf6724b4717d956c6e00290
Parents: 4664b6b
Author: WANG Sheng <[email protected]>
Authored: Wed Dec 2 11:27:36 2015 +0800
Committer: WANG Sheng <[email protected]>
Committed: Fri Dec 4 18:13:11 2015 +0800

----------------------------------------------------------------------
 .../singa/neuralnet/connection_layer/bridge.h   |  84 +++-----
 include/singa/worker.h                          |  38 ++--
 src/neuralnet/connection_layer/bridge.cc        |  78 +++++++-
 src/neuralnet/neuron_layer/dummy.cc             |   2 +-
 src/test/test_connection_layers.cc              |  88 +++++++-
 src/worker.cc                                   | 200 +++++++++----------
 6 files changed, 298 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/include/singa/neuralnet/connection_layer/bridge.h
----------------------------------------------------------------------
diff --git a/include/singa/neuralnet/connection_layer/bridge.h 
b/include/singa/neuralnet/connection_layer/bridge.h
index 5f224b3..b27693d 100644
--- a/include/singa/neuralnet/connection_layer/bridge.h
+++ b/include/singa/neuralnet/connection_layer/bridge.h
@@ -22,79 +22,55 @@
 #ifndef SINGA_NEURALNET_CONNECTION_LAYER_BRIDGE_H_
 #define SINGA_NEURALNET_CONNECTION_LAYER_BRIDGE_H_
 
+#include <string>
+#include <unordered_map>
 #include <vector>
+#include "singa/comm/socket.h"
 #include "singa/neuralnet/layer.h"
 
 namespace singa {
-class BridgeLayer : virtual public ConnectionLayer {
+
+class BridgeLayer : public ConnectionLayer {
  public:
-  void set_ready(bool a) {
-    ready_ = a;
-  }
-  bool ready() const {
-    return ready_;
-  }
-  virtual bool is_bridgesrclayer() const {
-    return false;
-  }
-  virtual bool is_bridgedstlayer() const {
-    return false;
-  }
+  void set_ready(bool a) { ready_ = a; }
+  bool ready() const { return ready_; }
+  // Bind the layer with dealer instance by worker at runtime
+  void MakePaired(Layer* pair, int grp_id, Dealer* dealer,
+                  std::unordered_map<std::string, Layer*>* name2bridge);
+  // Send blobs to other workers due to model partitions
+  void SendBlobs(bool handle_data);
+  // Receive blobs from other workers due to model partitions;
+  void ReceiveBlobs(bool handle_data);
 
  protected:
   //!< true if received grad from BridgeDstLayer
-  bool ready_;
+  bool ready_ = false;
+  int group_id_ = 0;
+  Layer* pair_ = nullptr;
+  Dealer* dealer_ = nullptr;
+  std::unordered_map<std::string, Layer*>* name2bridge_ = nullptr;
 };
 
 /**
- * For recv data from layer on other threads which may resident on other nodes
- * due to layer/data partiton
+ * For sending data to layer on other threads which may resident on other nodes
+ * due to layer/data partition.
  */
-class BridgeDstLayer : public BridgeLayer {
+class BridgeSrcLayer : public BridgeLayer {
  public:
   void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
-  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override {
-    // reset ready_ for next iteration.
-    ready_ = false;
-  }
-  void ComputeGradient(int flag,  const vector<Layer*>& srclayers) override {}
-  bool is_bridgedstlayer() const {
-    return true;
-  }
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
 };
 
 /**
- * For sending data to layer on other threads which may resident on other nodes
- * due to layer/data partition.
+ * For recv data from layer on other threads which may resident on other nodes
+ * due to layer/data partiton
  */
-class BridgeSrcLayer : public BridgeLayer {
+class BridgeDstLayer : public BridgeLayer {
  public:
-  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override 
{
-    CHECK_GE(srclayers.size(), 1);
-    srclayer_ = srclayers.at(0);
-  }
-  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override {}
-  void ComputeGradient(int flag,  const vector<Layer*>& srclayers) override {
-    ready_ = false;
-  }
-  const Blob<float>& data(const Layer* from) const override {
-    return srclayer_->data(this);
-  }
-  Blob<float>* mutable_data(const Layer* from) override {
-    return srclayer_->mutable_data(this);
-  }
-  const Blob<float>& grad(const Layer* from) const override {
-    return srclayer_->grad(this);
-  }
-  Blob<float>* mutable_grad(const Layer* from) override {
-    return srclayer_->mutable_grad(this);
-  }
-  bool is_bridgesrclayer() const override {
-    return true;
-  }
-
- private:
-  Layer* srclayer_;
+  void Setup(const LayerProto& conf, const vector<Layer*>& srclayers) override;
+  void ComputeFeature(int flag, const vector<Layer*>& srclayers) override;
+  void ComputeGradient(int flag, const vector<Layer*>& srclayers) override;
 };
 
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/include/singa/worker.h
----------------------------------------------------------------------
diff --git a/include/singa/worker.h b/include/singa/worker.h
index 1ed3642..53f2118 100644
--- a/include/singa/worker.h
+++ b/include/singa/worker.h
@@ -23,6 +23,7 @@
 #define SINGA_WORKER_H_
 
 #include <string>
+#include <unordered_map>
 #include <vector>
 #include "singa/comm/socket.h"
 #include "singa/neuralnet/neuralnet.h"
@@ -78,7 +79,6 @@ class Worker {
    */
   virtual void Setup(int grp_id, int id, const JobProto& conf,
       NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net);
-
   /**
    * Main function of Worker.
    *
@@ -94,6 +94,17 @@ class Worker {
    */
   void Test(int steps, Phase phase, NeuralNet* net);
   /**
+   * Init sockets in a worker, including:
+   * 1. a global socket communicates with stub
+   * 2. a bridge socket dedicated for bridge layer communications
+   *
+   * the bridge socket will be binded to each bridge layer
+   *
+   * @param[in] net pointer to a neural net whose bridge layer will be binded
+   * with a socket.
+   */
+  void InitSockets(const NeuralNet* net);
+  /**
    * Init values of Param instances assocaited with local layers (i.e., layers
    * dispatched to this worker).
    *
@@ -118,7 +129,6 @@ class Worker {
    * initialized.
    */
   void InitNetParams(const JobProto& job_conf, NeuralNet* net);
-
   /**
    * Checkpoint all Param objects owned by the worker onto disk.
    * The serialization is done using BlobProtos which includes the name, 
version
@@ -130,7 +140,6 @@ class Worker {
    * @param net the training net whose Param objects will be dumped.
    */
   void Checkpoint(int step, const std::string& folder, NeuralNet* net);
-
   /**
     * Train one mini-batch.
     * Test/Validation is done before training.
@@ -139,7 +148,6 @@ class Worker {
     * @param[in] net neural net to be trained.
     */
   virtual void TrainOneBatch(int step, NeuralNet* net) = 0;
-
   /**
    * Test/validate one mini-batch data.
    *
@@ -148,7 +156,6 @@ class Worker {
    * @param[in] net neural net for test
    */
   virtual void TestOneBatch(int step, Phase phase, NeuralNet* net) = 0;
-
   /**
    * Display infomation from layers.
    *
@@ -159,7 +166,6 @@ class Worker {
    * @param net display layers from this neural net.
    */
   void Display(int flag, const std::string& prefix, NeuralNet* net);
-
   /**
    * Put Param values to server.
    *
@@ -167,7 +173,6 @@ class Worker {
    * @param step used as current param version for the put request
    */
   int Put(int step, Param* param);
-
   /**
    * Get Param with specific version from server
    * If the current version >= the requested version, then return.
@@ -176,7 +181,6 @@ class Worker {
    * @param step requested param version
    */
   int Get(int step, Param* param);
-
   /**
    * Update Param.
    *
@@ -184,7 +188,6 @@ class Worker {
    * @param step training step used for updating (e.g., deciding learning 
rate).
    */
   int Update(int step, Param* param);
-
   /**
    * Wait for the response of the update/get requests.
    *
@@ -192,23 +195,10 @@ class Worker {
    * @param step not used now.
    */
   int Collect(int step, Param* param);
-
   /**
    * Call Collect() for every param of net
    */
   int CollectAll(int step, NeuralNet* net);
-
-  /**
-   * Receive blobs from other workers due to model partitions.
-   */
-  void ReceiveBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net);
-
-  /**
-   * Send blobs to other workers due to model partitions.
-   */
-  void SendBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net);
-
-
   /**
    * @param[in] step
    * @return true if it is time to display training info, e.g., loss; otherwise
@@ -284,8 +274,10 @@ class Worker {
   NeuralNet* train_net_ = nullptr;
   NeuralNet* test_net_ = nullptr;
   NeuralNet* val_net_ = nullptr;
-  Dealer* layer_dealer_ = nullptr;
   Dealer* dealer_ = nullptr;
+  // bridge layer related
+  Dealer* bridge_dealer_ = nullptr;
+  std::unordered_map<std::string, Layer*> name2bridge_;
 };
 
 class BPWorker: public Worker {

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/src/neuralnet/connection_layer/bridge.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/connection_layer/bridge.cc 
b/src/neuralnet/connection_layer/bridge.cc
index 5a43c20..1ad4b0c 100644
--- a/src/neuralnet/connection_layer/bridge.cc
+++ b/src/neuralnet/connection_layer/bridge.cc
@@ -20,15 +20,89 @@
 *************************************************************/
 
 #include "singa/neuralnet/connection_layer/bridge.h"
+#include "singa/comm/msg.h"
 
 namespace singa {
 
 using std::vector;
-void BridgeDstLayer::Setup(const LayerProto& proto,
+
+void BridgeLayer::MakePaired(Layer* pair, int grp_id, Dealer* dealer,
+    std::unordered_map<std::string, Layer*>* name2bridge) {
+  pair_ = pair;
+  group_id_ = grp_id;
+  dealer_ = dealer;
+  name2bridge_ = name2bridge;
+}
+
+void BridgeLayer::SendBlobs(bool handle_data) {
+  CHECK(dealer_) << "NULL dealer for bridges in worker (" << group_id_
+                 << ", " << partition_id() << ")";
+  Msg *msg = new Msg();
+  msg->set_src(Addr(group_id_, partition_id(), kWorkerLayer));
+  msg->set_dst(Addr(group_id_, pair_->partition_id(), kWorkerLayer));
+  msg->AddFrame(pair_->name().c_str(), pair_->name().length());
+  auto const& blob = handle_data ? data(nullptr) : grad(nullptr);
+  msg->AddFrame(blob.cpu_data(), blob.count() * sizeof(float));
+  dealer_->Send(&msg);
+}
+
+void BridgeLayer::ReceiveBlobs(bool handle_data) {
+  CHECK(dealer_) << "NULL dealer for bridges in worker (" << group_id_
+                 << ", " << partition_id() << ")";
+  while (!ready()) {
+    auto msg = dealer_->Receive();
+    CHECK_EQ(AddrGrp(msg->src()), group_id_);
+    string name(static_cast<char*>(msg->FrameData()), msg->FrameSize());
+    auto receive_layer = name2bridge_->at(name);
+    auto blob = handle_data ? receive_layer->mutable_data(nullptr) :
+                receive_layer -> mutable_grad(nullptr);
+    msg->NextFrame();
+    memcpy(blob->mutable_cpu_data(), msg->FrameData(), msg->FrameSize());
+    dynamic_cast<BridgeLayer*>(receive_layer)->set_ready(true);
+    delete msg;
+  }
+}
+
+void BridgeSrcLayer::Setup(const LayerProto& conf,
+    const vector<Layer*>& srclayers) {
+  CHECK_GE(srclayers.size(), 1);
+  Layer::Setup(conf, srclayers);
+  data_.Reshape(srclayers[0]->data(this).shape());
+  grad_.ReshapeLike(data_);
+  data_.ShareData(srclayers[0]->data(this));
+  grad_.ShareData(srclayers[0]->grad(this));
+}
+
+void BridgeSrcLayer::ComputeFeature(int flag, const vector<Layer*>& srcs) {
+  // send data
+  SendBlobs(true);
+  // reset flag for receiving gradient in compute gradient phase
+  set_ready(false);
+}
+
+void BridgeSrcLayer::ComputeGradient(int flag, const vector<Layer*>& srcs) {
+  // receive gradient
+  ReceiveBlobs(false);
+}
+
+void BridgeDstLayer::Setup(const LayerProto& conf,
     const vector<Layer*>& srclayers) {
-  Layer::Setup(proto, srclayers);
   CHECK_EQ(srclayers.size(), 1);
+  Layer::Setup(conf, srclayers);
   data_.Reshape(srclayers[0]->data(this).shape());
   grad_.ReshapeLike(data_);
 }
+
+void BridgeDstLayer::ComputeFeature(int flag, const vector<Layer*>& srcs) {
+  // receive data
+  ReceiveBlobs(true);
+}
+
+void BridgeDstLayer::ComputeGradient(int flag, const vector<Layer*>& srcs) {
+  // send gradient
+  SendBlobs(false);
+  // reset flag for receiving data in compute feature phase
+  set_ready(false);
+}
+
 }  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/src/neuralnet/neuron_layer/dummy.cc
----------------------------------------------------------------------
diff --git a/src/neuralnet/neuron_layer/dummy.cc 
b/src/neuralnet/neuron_layer/dummy.cc
index 2ef702d..8d165f7 100644
--- a/src/neuralnet/neuron_layer/dummy.cc
+++ b/src/neuralnet/neuron_layer/dummy.cc
@@ -69,4 +69,4 @@ void DummyLayer::ComputeGradient(int flag, const 
vector<Layer*>& srclayers) {
   }
 }
 
-} // namespace singa
+}  // namespace singa

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/src/test/test_connection_layers.cc
----------------------------------------------------------------------
diff --git a/src/test/test_connection_layers.cc 
b/src/test/test_connection_layers.cc
index 3d931b3..837a941 100644
--- a/src/test/test_connection_layers.cc
+++ b/src/test/test_connection_layers.cc
@@ -19,7 +19,12 @@
 *
 *************************************************************/
 
+#include <string>
+#include <unordered_map>
+#include <vector>
 #include "gtest/gtest.h"
+#include "singa/comm/msg.h"
+#include "singa/comm/socket.h"
 #include "singa/neuralnet/connection_layer/bridge.h"
 #include "singa/neuralnet/neuron_layer/dummy.h"
 #include "singa/proto/job.pb.h"
@@ -28,8 +33,8 @@ using namespace singa;
 
 TEST(ConnectionLayerTest, DummyTest) {
   // use dummy as input layer
-  LayerProto proto_in;
   vector<Layer*> src_in;
+  LayerProto proto_in;
   proto_in.set_name("dummy_input");
   proto_in.mutable_dummy_conf()->set_input(true);
   proto_in.mutable_dummy_conf()->add_shape(10);
@@ -39,11 +44,11 @@ TEST(ConnectionLayerTest, DummyTest) {
   ASSERT_EQ(in.data(nullptr).shape(0), 10);
   ASSERT_EQ(in.data(nullptr).shape(1), 20);
   in.ComputeFeature(0, src_in);
- 
+
   // use dummy as neuron layer
-  LayerProto proto_neu;
   vector<Layer*> src_neu;
   src_neu.push_back(static_cast<Layer*>(&in));
+  LayerProto proto_neu;
   proto_neu.set_name("dummy_neuron");
   proto_neu.mutable_dummy_conf();
   DummyLayer neu;
@@ -56,9 +61,9 @@ TEST(ConnectionLayerTest, DummyTest) {
     ASSERT_EQ(in.data(nullptr).cpu_data()[i], neu.data(nullptr).cpu_data()[i]);
 
   // use dummy as output layer
-  LayerProto proto_out;
   vector<Layer*> src_out;
   src_out.push_back(static_cast<Layer*>(&neu));
+  LayerProto proto_out;
   proto_out.set_name("dummy_output");
   proto_out.mutable_dummy_conf()->set_output(true);
   DummyLayer out;
@@ -69,7 +74,7 @@ TEST(ConnectionLayerTest, DummyTest) {
   ASSERT_EQ(in.data(nullptr).count(), out.data(nullptr).count());
   for (int i = 0; i < in.data(nullptr).count(); ++i)
     ASSERT_EQ(in.data(nullptr).cpu_data()[i], out.data(nullptr).cpu_data()[i]);
- 
+
   // test for computing gradient
   out.ComputeGradient(0, src_out);
   neu.ComputeGradient(0, src_neu);
@@ -77,3 +82,76 @@ TEST(ConnectionLayerTest, DummyTest) {
   for (int i = 0; i < in.grad(nullptr).count(); ++i)
     ASSERT_EQ(in.grad(nullptr).cpu_data()[i], out.grad(nullptr).cpu_data()[i]);
 }
+
+
+TEST(ConnectionLayerTest, BridgeTest) {
+  // use dummy as input layer
+  vector<Layer*> src_in;
+  LayerProto proto_in;
+  proto_in.set_name("dummy_input");
+  proto_in.mutable_dummy_conf()->set_input(true);
+  proto_in.mutable_dummy_conf()->add_shape(10);
+  proto_in.mutable_dummy_conf()->add_shape(20);
+  DummyLayer in;
+  in.Setup(proto_in, src_in);
+
+  // add src bridge layer
+  vector<Layer*> src_src;
+  src_src.push_back(static_cast<Layer*>(&in));
+  LayerProto proto_src;
+  proto_in.set_name("bridge_src");
+  BridgeSrcLayer src;
+  src.Setup(proto_src, src_src);
+  ASSERT_EQ(src.data(nullptr).shape(0), 10);
+  ASSERT_EQ(src.data(nullptr).shape(1), 20);
+
+  // add dst bridge layer
+  vector<Layer*> src_dst;
+  src_dst.push_back(static_cast<Layer*>(&src));
+  LayerProto proto_dst;
+  proto_dst.set_name("bridge_dst");
+  BridgeDstLayer dst;
+  dst.Setup(proto_dst, src_dst);
+  ASSERT_EQ(dst.data(nullptr).shape(0), 10);
+  ASSERT_EQ(dst.data(nullptr).shape(1), 20);
+
+  // bind bridges to socket
+  Router router(10);
+  router.Bind("inproc://router");
+  Dealer dealer(0);
+  dealer.Connect("inproc://router");
+  std::unordered_map<std::string, Layer*> name2bridge;
+  name2bridge[src.name()] = &src;
+  name2bridge[dst.name()] = &dst;
+  src.MakePaired(static_cast<Layer*>(&dst), 0, &dealer, &name2bridge);
+  dst.MakePaired(static_cast<Layer*>(&src), 0, &dealer, &name2bridge);
+
+  // use dummy as output layer
+  LayerProto proto_out;
+  vector<Layer*> src_out;
+  src_out.push_back(static_cast<Layer*>(&dst));
+  proto_out.set_name("dummy_output");
+  proto_out.mutable_dummy_conf()->set_output(true);
+  DummyLayer out;
+  out.Setup(proto_out, src_out);
+
+  // test for computing feature
+  in.ComputeFeature(0, src_in);
+  src.ComputeFeature(0, src_src);
+  Msg* msg_data = router.Receive();
+  router.Send(&msg_data);
+  dst.ComputeFeature(0, src_dst);
+  out.ComputeFeature(0, src_out);
+  for (int i = 0; i < in.data(nullptr).count(); ++i)
+    ASSERT_EQ(in.data(nullptr).cpu_data()[i], out.data(nullptr).cpu_data()[i]);
+
+  // test for computing gradient
+  out.ComputeGradient(0, src_out);
+  dst.ComputeGradient(0, src_dst);
+  Msg* msg_grad = router.Receive();
+  router.Send(&msg_grad);
+  src.ComputeGradient(0, src_src);
+  in.ComputeGradient(0, src_in);
+  for (int i = 0; i < in.grad(nullptr).count(); ++i)
+    ASSERT_EQ(in.grad(nullptr).cpu_data()[i], out.grad(nullptr).cpu_data()[i]);
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/bd2e3453/src/worker.cc
----------------------------------------------------------------------
diff --git a/src/worker.cc b/src/worker.cc
index e6c2279..4756514 100644
--- a/src/worker.cc
+++ b/src/worker.cc
@@ -51,14 +51,102 @@ void Worker::Setup(int grp_id, int id, const JobProto& 
conf,
   train_net_ = train_net;
   val_net_ = val_net;
   test_net_ = test_net;
-  layer_dealer_ = dealer_ = nullptr;
+  bridge_dealer_ = dealer_ = nullptr;
 }
 
 Worker::~Worker() {
-  if (layer_dealer_)
-    delete layer_dealer_;
-  if (dealer_)
-    delete dealer_;
+  if (dealer_) delete dealer_;
+  if (bridge_dealer_) delete bridge_dealer_;
+}
+
+void Worker::Run() {
+  LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") start";
+  auto cluster = Cluster::Get();
+  int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group();
+  CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp));
+  step_ = job_conf_.step();
+  InitSockets(train_net_);
+  InitNetParams(job_conf_, train_net_);
+  while (!StopNow(step_)) {
+    if (ValidateNow(step_) && val_net_ != nullptr) {
+      CollectAll(step_, val_net_);
+      LOG(ERROR) << "Validation @ step " + std::to_string(step_);
+      Test(job_conf_.validate_steps(), kVal, val_net_);
+    }
+    if (TestNow(step_) && test_net_ != nullptr) {
+      CollectAll(step_, test_net_);
+      LOG(ERROR) << "Test @ step " + std::to_string(step_);
+      Test(job_conf_.test_steps(), kTest, test_net_);
+    }
+    if (CheckpointNow(step_) && grp_id_ == 0) {
+      CollectAll(step_, train_net_);
+      Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_);
+      job_conf_.set_step(step_);
+    }
+    TrainOneBatch(step_, train_net_);
+    if (DisplayNow(step_) && grp_id_ == 0 && id_ == 0)
+      Display(kTrain, "Train @ step " + std::to_string(step_), train_net_);
+    step_++;
+  }
+
+  // save the model
+  if (grp_id_ == 0)
+    Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_);
+  // clean up
+  cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp);
+  // notify the stub on worker stop
+  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
+  msg->set_type(kStop);
+  dealer_->Send(&msg);  // use param dealer to send the stop msg
+  LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops";
+}
+
+void Worker::Test(int steps, Phase phase, NeuralNet* net) {
+  for (int step = 0; step < steps; step++)
+    TestOneBatch(step, phase, net);
+  Display(phase, " ", net);
+}
+
+void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) {
+  dealer->Connect(kInprocRouterEndpoint);
+  Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub));
+  ping->set_type(kConnect);
+  dealer->Send(&ping);
+}
+
+void Worker::InitSockets(const NeuralNet* net) {
+  // TODO(wangsh): provide a unique sock id from cluster
+  dealer_ = new Dealer(0);
+  ConnectStub(grp_id_, id_, dealer_, kWorkerParam);
+  for (auto layer : net->layers()) {
+    if (layer->partition_id() == id_) {
+      if (typeid(layer) == typeid(BridgeDstLayer)
+          || typeid(layer) == typeid(BridgeSrcLayer)) {
+        // TODO(wangsh): provide a unique socket id from cluster
+        bridge_dealer_ = new Dealer(1);
+        ConnectStub(grp_id_, id_, bridge_dealer_, kWorkerLayer);
+        break;
+      }
+    }
+  }
+  // bind dealer to bridge layers
+  if (bridge_dealer_ != nullptr) {
+    for (auto dst : net->layers()) {
+      if (typeid(dst) == typeid(BridgeDstLayer)) {
+        auto src = net->srclayers(dst)[0];
+        name2bridge_[src->name()] = src;
+        name2bridge_[dst->name()] = dst;
+        if (src->partition_id() == id_) {
+          dynamic_cast<BridgeLayer*>(src)->MakePaired(dst, grp_id_,
+              bridge_dealer_, &name2bridge_);
+        }
+        if (dst->partition_id() == id_) {
+          dynamic_cast<BridgeLayer*>(dst)->MakePaired(src, grp_id_,
+              bridge_dealer_, &name2bridge_);
+        }
+      }
+    }
+  }
 }
 
 void Worker::InitNetParams(const JobProto& job_conf, NeuralNet* net) {
@@ -116,75 +204,6 @@ void Worker::InitNetParams(const JobProto& job_conf, 
NeuralNet* net) {
   }
 }
 
-void ConnectStub(int grp, int id, Dealer* dealer, EntityType entity) {
-  dealer->Connect(kInprocRouterEndpoint);
-  Msg* ping = new Msg(Addr(grp, id, entity), Addr(-1, -1, kStub));
-  ping->set_type(kConnect);
-  dealer->Send(&ping);
-}
-
-void Worker::Test(int steps, Phase phase, NeuralNet* net) {
-  for (int step = 0; step < steps; step++)
-    TestOneBatch(step, phase, net);
-  Display(phase, " ", net);
-}
-
-void Worker::Run() {
-  LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") start";
-  auto cluster = Cluster::Get();
-  int svr_grp = grp_id_ / cluster->nworker_groups_per_server_group();
-  CHECK(cluster->runtime()->JoinSGroup(grp_id_, id_, svr_grp));
-  // TODO(wangsh): provide a unique sock id from cluster
-  dealer_ = new Dealer(0);
-  ConnectStub(grp_id_, id_, dealer_, kWorkerParam);
-  for (auto layer : train_net_->layers()) {
-    if (layer->partition_id() == id_) {
-      if (typeid(layer) == typeid(BridgeDstLayer)
-          || typeid(layer) == typeid(BridgeSrcLayer)) {
-        // TODO(wangsh): provide a unique socket id from cluster
-        layer_dealer_ = new Dealer(1);
-        ConnectStub(grp_id_, id_, layer_dealer_, kWorkerLayer);
-        break;
-      }
-    }
-  }
-
-  step_ = job_conf_.step();
-  InitNetParams(job_conf_, train_net_);
-  while (!StopNow(step_)) {
-    if (ValidateNow(step_) && val_net_ != nullptr) {
-      CollectAll(step_, val_net_);
-      LOG(ERROR) << "Validation @ step " + std::to_string(step_);
-      Test(job_conf_.validate_steps(), kVal, val_net_);
-    }
-    if (TestNow(step_) && test_net_ != nullptr) {
-      CollectAll(step_, test_net_);
-      LOG(ERROR) << "Test @ step " + std::to_string(step_);
-      Test(job_conf_.test_steps(), kTest, test_net_);
-    }
-    if (CheckpointNow(step_) && grp_id_ == 0) {
-      CollectAll(step_, train_net_);
-      Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_);
-      job_conf_.set_step(step_);
-    }
-    TrainOneBatch(step_, train_net_);
-    if (DisplayNow(step_) && grp_id_ == 0 && id_ == 0)
-      Display(kTrain, "Train @ step " + std::to_string(step_), train_net_);
-    step_++;
-  }
-
-  // save the model
-  if (grp_id_ == 0)
-    Checkpoint(step_, Cluster::Get()->checkpoint_folder(), train_net_);
-  // clean up
-  cluster->runtime()->LeaveSGroup(grp_id_, id_, svr_grp);
-  // notify the stub on worker stop
-  Msg* msg = new Msg(Addr(grp_id_, id_, kWorkerParam), Addr(-1, -1, kStub));
-  msg->set_type(kStop);
-  dealer_->Send(&msg);  // use param dealer to send the stop msg
-  LOG(ERROR) << "Worker (group = " <<grp_id_ << ", id = " << id_ << ") stops";
-}
-
 void Worker::Checkpoint(int step, const std::string& folder, NeuralNet* net) {
   BlobProtos bps;
   for (auto layer : net->layers()) {
@@ -279,39 +298,6 @@ void Worker::Display(int flag, const std::string& prefix, 
NeuralNet* net) {
   }
 }
 
-void Worker::ReceiveBlobs(bool data, bool grad, BridgeLayer* layer,
-                          NeuralNet* net) {
-  if (layer_dealer_ == nullptr) {
-    LOG(ERROR) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")";
-  }
-  while (!layer->ready()) {
-    auto msg = layer_dealer_->Receive();
-    CHECK_EQ(AddrGrp(msg->src()), grp_id_);
-    string name(static_cast<char*>(msg->FrameData()), msg->FrameSize());
-    auto receive_layer = net->name2layer(name);
-    auto data = receive_layer->mutable_data(nullptr);
-    msg->NextFrame();
-    memcpy(data->mutable_cpu_data(), msg->FrameData(), msg->FrameSize());
-    dynamic_cast<BridgeLayer*>(receive_layer)->set_ready(true);
-    delete msg;
-  }
-}
-
-void Worker::SendBlobs(bool data, bool grad, BridgeLayer* layer,
-                       NeuralNet* net) {
-  if (layer_dealer_ == nullptr) {
-    LOG(ERROR) << "Null dealer in worker (" << grp_id_ << ", " << id_ << ")";
-  }
-  auto dst = net->srclayers(layer).at(0);
-  Msg *msg = new Msg();
-  msg->set_src(Addr(grp_id_, id_, kWorkerLayer));
-  msg->set_dst(Addr(grp_id_, dst->partition_id(), kWorkerLayer));
-  msg->AddFrame(dst->name().c_str(), dst->name().length());
-  auto const & blob = layer->data(nullptr);
-  msg->AddFrame(blob.cpu_data(), blob.count() * sizeof(float));
-  layer_dealer_->Send(&msg);
-}
-
 /****************************BPWorker**********************************/
 void BPWorker::TrainOneBatch(int step, NeuralNet* net) {
   Forward(step, kTrain, net);

Reply via email to