add ClusterRunTime component definition

Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/6b1e65ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/6b1e65ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/6b1e65ee

Branch: refs/heads/master
Commit: 6b1e65ee05ef55c7152256a71d696d09e6310b5b
Parents: 679573a
Author: wangsheng1001 <[email protected]>
Authored: Mon May 25 14:27:40 2015 +0800
Committer: wangsheng1001 <[email protected]>
Committed: Mon May 25 14:27:40 2015 +0800

----------------------------------------------------------------------
 Makefile.example                        |  91 ++++
 include/utils/cluster.h                 |  15 +
 include/utils/cluster_rt.h              |  59 +++
 src/test/dist_test/test_consistency.cc  | 406 ----------------
 src/test/dist_test/test_core.cc         | 192 --------
 src/test/dist_test/test_da.cc           | 700 ---------------------------
 src/test/dist_test/test_dary.cc         |  85 ----
 src/test/dist_test/test_disk_table.cc   | 188 -------
 src/test/dist_test/test_mnistlayer.cc   | 165 -------
 src/test/dist_test/test_model.cc        |  25 -
 src/test/dist_test/test_neuralnet.cc    | 141 ------
 src/test/dist_test/test_pm.cc           |  88 ----
 src/test/dist_test/test_router.cc       |  27 --
 src/test/dist_test/test_split.cc        | 304 ------------
 src/test/dist_test/test_table_server.cc | 357 --------------
 src/test/dist_test/test_tuple.cc        | 258 ----------
 src/test/model/test_blob.cc             |  58 ---
 src/test/model/test_data_layer.cc       | 178 -------
 src/test/model/test_label_source.cc     |  59 ---
 src/test/model/test_param.cc            | 138 ------
 src/test/model/test_proto.cc            |  67 ---
 src/test/model/test_rgb_dir_source.cc   |  63 ---
 src/test/test_cluster.cc                |  10 +-
 src/test/test_communication.cc          | 158 ------
 src/test/test_shard.cc                  |  56 ---
 src/utils/cluster_rt.cc                 |  32 ++
 26 files changed, 205 insertions(+), 3715 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/Makefile.example
----------------------------------------------------------------------
diff --git a/Makefile.example b/Makefile.example
new file mode 100644
index 0000000..80dfc26
--- /dev/null
+++ b/Makefile.example
@@ -0,0 +1,91 @@
+###################User Config Varaibles #############################
+# third-party library installation folder
+HOME_DIR := /usr/
+# Lib folder for system and external libs. You may need to change it.
+LIBRARY_DIRS := $(HOME_DIR)/lib64 $(HOME_DIR)/lib $(HOME_DIR)/local/lib
+# Header folder for system and external libs. You may need to change it.
+INCLUDE_DIRS := $(HOME_DIR)/include ./include
+# g++ location, should support c++11, tested with 4.8.1
+CXX := g++
+
+######################Setting Varialbes#######################################
+LIBRARIES := glog gflags protobuf rt opencv_highgui opencv_imgproc opencv_core\
+       lmdb openblas zmq czmq
+
+LDFLAGS := $(foreach librarydir, $(LIBRARY_DIRS), -L$(librarydir))\
+       $(foreach library, $(LIBRARIES), -l$(library))
+# Folder to store compiled files
+BUILD_DIR := build
+MSHADOW_FLAGS :=-DMSHADOW_USE_CUDA=0 -DMSHADOW_USE_CBLAS=1 -DMSHADOW_USE_MKL=0
+CXXFLAGS := -O3 -Wall -pthread -fPIC -std=c++11 -Wno-unknown-pragmas \
+       $(MSHADOW_FLAGS) -DCPU_ONLY=1 \
+       -funroll-loops $(foreach includedir, $(INCLUDE_DIRS), -I$(includedir))
+
+# find user defined .proto file, and then compute the corresponding .h, .cc
+# files, which cannot be found by shell find, because they haven't been
+# generated currently
+PROTOS := $(shell find src/proto/ -name "*.proto")
+PROTO_SRCS :=$(PROTOS:.proto=.pb.cc)
+PROTO_HDRS :=$(patsubst src%, include%, $(PROTOS:.proto=.pb.h))
+PROTO_OBJS :=$(addprefix $(BUILD_DIR)/, $(PROTO_SRCS:.cc=.o))
+
+# each singa src file will generate a .o file
+SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" \) \
+       -prune -o \( -name "*.cc" -type f \) -print )
+SINGA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(SINGA_SRCS:.cc=.o)) \
+       $(PROTO_OBJS) )
+-include $(SINGA_OBJS:%.o=%.P)
+
+TEST_SRCS :=$(shell find src/test/ -maxdepth 1 -name "*.cc")
+TEST_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(TEST_SRCS:.cc=.o)))
+-include $(TEST_OBJS:%.o=%.P)
+
+GTEST_SRC := include/gtest/gtest-all.cc
+GTEST_HDR := include/gtest/gtest.h
+GTEST_LIB := $(BUILD_DIR)/libgtest.a
+
+OBJS := $(sort $(SINGA_OBJS) $(TEST_OBJS) )
+
+########################Compilation Section###################################
+.PHONY: singa test
+
+singa: $(PROTO_OBJS) $(SINGA_OBJS)
+       $(CXX) $(SINGA_OBJS) src/main.cc -o $(BUILD_DIR)/singa $(CXXFLAGS) 
$(LDFLAGS)
+       @echo
+
+loader: proto $(LOADER_OBJS)
+       $(CXX) $(LOADER_OBJS) -o $(BUILD_DIR)/loader $(CXXFLAGS) $(LDFLAGS)
+       @echo
+
+test:  proto $(GTEST_LIB) $(TEST_OBJS) $(SINGA_OBJS)
+       $(CXX) $(TEST_OBJS) include/gtest/gtest_main.cc $(GTEST_LIB) \
+               $(SINGA_OBJS) -o $(BUILD_DIR)/test $(CXXFLAGS) $(LDFLAGS)
+       @echo
+
+$(GTEST_LIB): $(GTEST_HDR) $(GTEST_SRC)
+       $(CXX) $(GTEST_SRC) -c -o $(BUILD_DIR)/gtest-all.o $(CXXFLAGS)
+       ar -rv $(GTEST_LIB) $(BUILD_DIR)/gtest-all.o
+
+# compile all files
+$(OBJS):$(BUILD_DIR)/%.o : %.cc
+       @mkdir -p $(dir $@)
+       $(CXX) $<  $(CXXFLAGS) -MMD -c -o $@
+       cp $(BUILD_DIR)/$*.d $(BUILD_DIR)/$*.P; \
+       sed -e 's/#.*//' -e 's/^[^:]*: *//' -e 's/ *\\$$//' \
+               -e '/^$$/ d' -e 's/$$/ :/' < $(BUILD_DIR)/$*.d >> 
$(BUILD_DIR)/$*.P; \
+       rm -f $*.d
+
+proto: $(PROTO_OBJS)
+
+$(PROTO_SRCS): $(PROTOS)
+       protoc --proto_path=src/proto --cpp_out=src/proto $(PROTOS)
+       mkdir -p include/proto/
+       cp src/proto/*.pb.h include/proto/
+       @echo
+
+clean:
+       rm -rf *.a *.so
+       rm -rf include/proto/*
+       rm -rf src/proto/*.pb.h src/proto/*.pb.cc
+       rm -rf $(BUILD_DIR)
+       @echo

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 4812987..cd1ca76 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -6,6 +6,7 @@
 #include <memory>
 #include <vector>
 #include "proto/cluster.pb.h"
+#include "cluster_rt.h"
 
 using std::shared_ptr;
 using std::string;
@@ -108,6 +109,19 @@ class Cluster {
   }
    */
 
+  //ClusterRuntime functions
+  bool server_watch(int gid, int sid) const {
+    return false;
+  }
+
+  bool worker_join_sgroup(int gid, int wid, int server_group) const {
+    return false;
+  }
+
+  bool worker_leave_sgroup(int gid, int wid, int s_group) const {
+    return false;
+  }
+
  private:
   Cluster(const ClusterProto &cluster, int procs_id) ;
   void SetupFolders(const ClusterProto &cluster);
@@ -120,6 +134,7 @@ class Cluster {
   // make this class a singlton
   static shared_ptr<Cluster> instance_;
 };
+
 }  // namespace singa
 
 #endif  // INCLUDE_UTILS_CLUSTER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
new file mode 100644
index 0000000..d9587ce
--- /dev/null
+++ b/include/utils/cluster_rt.h
@@ -0,0 +1,59 @@
+#ifndef INCLUDE_UTILS_CLUSTER_RT_H_
+#define INCLUDE_UTILS_CLUSTER_RT_H_
+#include <glog/logging.h>
+#include <string>
+#include <utility>
+
+using std::string;
+
+namespace singa {
+
+/**
+ * ClusterRuntime is a runtime service that manages dynamic configuration and 
status
+ * of the whole cluster. It mainly provides following services:
+ *    1)  Provide running status of each server/worker
+ *    1)  Translate process id to (hostname:port)
+ */
+class ClusterRuntime{
+ public:
+  ClusterRuntime(){}
+  virtual ~ClusterRuntime(){}
+
+  /**
+   * Initialize the runtime instance
+   */
+  virtual bool Init(){return false;}
+
+  /**
+   * Server: watch all workers in a server group, will be notified when all 
workers have left 
+   */
+  virtual bool sWatchSGroup(int gid, int sid){ return false;}
+
+  /**
+   * Worker: join a server group (i.e. start to read/update these servers)
+   */
+  virtual bool wJoinSGroup(int gid, int wid, int s_group){ return false;}
+
+  /**
+   * Worker: leave a server group (i.e. finish its all work)
+   */
+  virtual bool wLeaveSGroup(int gid, int wid, int s_group){ return false;}
+};
+
+
+class ZKClusterRT : public ClusterRuntime{
+ public:
+  ZKClusterRT(string host);
+  ~ZKClusterRT();
+  bool Init();
+  bool sWatchSGroup(int gid, int sid);
+  bool wJoinSGroup(int gid, int wid, int s_group);
+  bool wLeaveSGroup(int gid, int wid, int s_group);
+
+ private:
+  string host_;
+};
+
+} // namespace singa
+
+#endif  //  INCLUDE_UTILS_CLUSTER_RT_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_consistency.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_consistency.cc 
b/src/test/dist_test/test_consistency.cc
deleted file mode 100644
index a4ed9b2..0000000
--- a/src/test/dist_test/test_consistency.cc
+++ /dev/null
@@ -1,406 +0,0 @@
-//  Copyright © 2014 Anh Dinh. All Rights Reserved.
-
-//  Testing the unbalance in spliting parameter vectors.
-
-#include "core/global-table.h"
-#include "core/common.h"
-#include "core/disk-table.h"
-#include "core/table.h"
-#include "core/table_server.h"
-#include "utils/global_context.h"
-#include <gflags/gflags.h>
-#include "proto/model.pb.h"
-#include "proto/common.pb.h"
-#include "worker.h"
-#include "coordinator.h"
-#include "utils/common.h"
-#include "utils/proto_helper.h"
-
-#include <cmath>
-#include <stdlib.h>
-#include <vector>
-#include <iostream>
-#include <fstream>
-
-
-DEFINE_bool(restore_mode, false, "restore from checkpoint file");
-using namespace lapis;
-using std::vector;
-
-//DEFINE_bool(sync_update, false, "Synchronous put/update queue");
-DEFINE_int32(checkpoint_frequency, 5000, "frequency for cp");
-DEFINE_int32(checkpoint_after, 1, "cp after this steps");
-DEFINE_string(par_mode, "hybrid",  "time training algorithm");
-DEFINE_bool(restore, false, "restore from checkpoint file");
-
-DEFINE_string(db_backend, "lmdb", "backend db");
-DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
-DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
-DEFINE_string(checkpoint_dir,"/data1/wangwei/lapis/","check point dir");
-DEFINE_int32(threshold,1000000, "max # of parameters in a vector");
-DEFINE_int32(iterations,5,"numer of get/put iterations");
-DEFINE_int32(workers,2,"numer of workers doing get/put");
-DECLARE_bool(checkpoint_enabled);
-
-#ifndef FLAGS_v
-  DEFINE_int32(v, 3, "vlog controller");
-#endif
-
-
-struct AnhUpdateHandler: BaseUpdateHandler<VKey,SGDValue>{
-       bool Update(SGDValue *a, const SGDValue &b){
-    float * adptr=a->mutable_data()->mutable_value()->mutable_data();
-    const float*bdptr=b.grad(0).value().data();
-    for(int i=0;i<b.grad(0).value_size();i++)
-      adptr[i]+=bdptr[i];
-               return true;
-       }
-
-  bool Get(const VKey k, const SGDValue &val, SGDValue *ret){
-      *ret = val;
-      return true;
-  }
-
-  bool is_checkpointable(const VKey k, const SGDValue v){
-       return true; //always checkpoint
-  }
-};
-
-typedef map<int, GlobalTable*> Map;
-Map tables;
-shared_ptr<NetworkThread> network;
-shared_ptr<GlobalContext> context;
-std::vector<ServerState*> server_states;
-TableServer *table_server;
-TableDelegate *delegate;
-void create_mem_table(int id, int num_shards){
-
-       TableDescriptor *info = new TableDescriptor(id, num_shards);
-         info->key_marshal = new Marshal<VKey>();
-         info->value_marshal = new Marshal<SGDValue>();
-         info->sharder = new VKeySharder;
-         info->accum = new AnhUpdateHandler;
-         info->partition_factory = new typename SparseTable<VKey, 
SGDValue>::Factory;
-         auto table=new TypedGlobalTable<VKey, SGDValue>();
-         table->Init(info);
-         tables[id] = table;
-}
-
-void coordinator_assign_tables(int id){
-       for (int i = 0; i < context->num_procs()        ; ++i) {
-           RegisterWorkerRequest req;
-           int src = 0;
-           //  adding memory server.
-           if (context->IsTableServer(i)) {
-             network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src);
-             server_states.push_back(new ServerState(i));
-           }
-         }
-         LOG(INFO) << " All servers registered and started up. Ready to go";
-         //  set itself as the current worker for the table
-         tables[id]->worker_id_ = network->id();
-
-         // memory servers are specified in global context. Round-robin 
assignment
-
-           VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< 
id;
-
-           int server_idx = 0;
-           for (int shard = 0; shard < tables[id]->num_shards(); ++shard) {
-             ServerState &server = *server_states[server_idx];
-             LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server "
-                       <<server_states[server_idx]->server_id;
-
-             // TODO(Anh) may overwrite this field if #shards>#table_servers
-             server.shard_id = shard;
-             server.local_shards.insert(new TaskId(id, shard));
-             server_idx = (server_idx + 1) % server_states.size();
-           }
-
-         VLOG(3)<<"table assignment";
-         //  then send table assignment
-         ShardAssignmentRequest req;
-         for (size_t i = 0; i < server_states.size(); ++i) {
-           ServerState &server = *server_states[i];
-           for (auto * task: server.local_shards) {
-             ShardAssignment *s  = req.add_assign();
-             s->set_new_worker(server.server_id);
-             s->set_table(task->table);
-             s->set_shard(task->shard);
-             //  update local tables
-             CHECK(tables.find(task->table)!=tables.end());
-             GlobalTable *t = tables.at(task->table);
-             t->get_partition_info(task->shard)->owner = server.server_id;
-             delete task;
-           }
-         }
-         VLOG(3)<<"finish table assignment, req size "<<req.assign_size();
-         network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, 
MTYPE_SHARD_ASSIGNMENT_DONE, req);
-         VLOG(3)<<"finish table server init";
-}
-
-
-void worker_table_init(){
-       table_server = new TableServer();
-       table_server->StartTableServer(tables);
-       VLOG(3) << "done starting table server";
-}
-
-double random_double(){
-       return static_cast<double>(rand())/static_cast<double>(RAND_MAX);
-}
-
-// popular table with random large or small messages.
-// the message distribution specified in FLAGS_large_precentage
-void coordinator_load_data(const vector<int>& tuples){
-  auto table = static_cast<TypedGlobalTable<VKey,SGDValue>*>(tables[0]);
-
-  int nservers=context->num_table_servers();
-  int keyid=0;
-  if (!FLAGS_restore_mode){
-    for(auto tuple: tuples){
-      for(int offset=0;offset<tuple;){
-        SGDValue x;
-        DAryProto *data=x.mutable_data();
-        DAryProto *grad=x.add_grad();
-        for(int i=0;i <std::min(FLAGS_threshold, tuple-offset);i++){
-          data->add_value(i*1.0f);
-          grad->add_value(i*1.0f);
-        }
-        offset+=data->value_size();
-        VKey key;
-        key.set_key(keyid++);
-        table->put(key,x);
-      }
-    }
-    LOG(ERROR)<<"put "<<keyid<<" tuples";
-  }
-
-  /*
-       LogFile *file = new LogFile("/data1/wangwei/lapis/checkpoint_0","rw",0);
-       VLOG(3) << "Loaded table " << file->file_name();
-       string k,v;
-       int table_size = file->read_latest_table_size();
-       VLOG(3) << "table size = " << table_size;
-       for (int i=0; i<table_size; i++){
-               int tmp;
-               file->previous_entry(&k, &v, &tmp);
-               int *key = reinterpret_cast<int *>((char*)&k[0]);
-               int *val = reinterpret_cast<int *>((char*)&v[0]);
-               VLOG(3) << "k = " << *key << " val = " << *val;
-       }
-       delete file;
-  */
-
-       /*
-       for (int i=0; i<num_keys; i++){
-               table->put(i,0); //loaded again
-       }*/
-       VLOG(3) << "Coordinator done loading ..., from process 
"<<NetworkThread::Get()->id();
-}
-
-void get(TypedGlobalTable<VKey,SGDValue>* table, const vector<int>& tuples){
-  SGDValue v;
-  int num_keys=0;
-  for(auto tuple: tuples){
-    num_keys+=tuple/FLAGS_threshold+(tuple%FLAGS_threshold!=0);
-  }
-  LOG(ERROR)<<"getting "<<num_keys<<" tuples";
-
-  for (int i=0; i<num_keys; i++){
-    VKey key;
-    key.set_key(i);
-    table->async_get(key, &v);
-  }
-
-
-  int key=0;
-  SGDValue val;
-
-  LOG(INFO)<<"start collect key";
-  for (int i=0; i<num_keys; i++){
-    VKey key;
-    while(!table->async_get_collect(&key, &val))
-      Sleep(0.001);
-    //LOG(INFO)<<"collect key "<<key<<" with val "<<val;
-  }
-}
-
-void update(TypedGlobalTable<VKey,SGDValue>* table, const vector<int>& tuples){
-  if(NetworkThread::Get()->id()==0)
-    sleep(2);
-  LOG(INFO)<<"start update";
-  int keyid=0;
-  for(auto tuple: tuples){
-    for(int offset=0;offset<tuple;){
-      SGDValue x;
-      DAryProto *grad=x.add_grad();
-      for(int i=0;i <std::min(FLAGS_threshold, tuple-offset);i++){
-        grad->add_value(i*1.0f);
-      }
-      offset+=grad->value_size();
-      VKey key;
-      key.set_key(keyid++);
-      table->update(key,x);
-    }
-  }
-  LOG(ERROR)<<"updated "<<keyid<<" tuples";
-}
-
-void worker_test_data(const vector<int>& tuples){
-  auto table = static_cast<TypedGlobalTable<VKey,SGDValue>*>(tables[0]);
-
-  get(table, tuples);
-  update(table, tuples);
-  update(table, tuples);
-  update(table, tuples);
-  get(table, tuples);
-}
-
-void shutdown(){
-       if (context->AmICoordinator()){
-               EmptyMessage msg;
-               for (int i=0; i<context->num_procs()-1; i++)
-                       network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg);
-                EmptyMessage shutdown_msg;
-                 for (int i = 0; i < network->size() - 1; i++) {
-                   network->Send(i, MTYPE_SHUTDOWN, shutdown_msg);
-                 }
-                 network->Flush();
-                 network->Shutdown();
-       }
-       else{
-         network->Flush();
-
-         network->Send(context->num_procs()-1, MTYPE_WORKER_END, 
EmptyMessage());
-
-         EmptyMessage msg;
-
-         network->Read(context->num_procs()-1, MTYPE_SHUTDOWN, &msg);
-
-         if (context->AmITableServer())
-                 table_server->ShutdownTableServer();
-
-         network->Shutdown();
-       }
-}
-
-void HandleShardAssignment() {
-
-  ShardAssignmentRequest shard_req;
-  auto mpi=NetworkThread::Get();
-  mpi->Read(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT, &shard_req);
-  //  request read from coordinator
-  for (int i = 0; i < shard_req.assign_size(); i++) {
-    const ShardAssignment &a = shard_req.assign(i);
-    GlobalTable *t = tables.at(a.table());
-    t->get_partition_info(a.shard())->owner = a.new_worker();
-
-
-    //if local shard, create check-point files
-    if (FLAGS_checkpoint_enabled && t->is_local_shard(a.shard())){
-      string checkpoint_file = 
StringPrintf("%s/checkpoint_%d",FLAGS_checkpoint_dir.c_str(), a.shard());
-        char hostname[256];
-        gethostname(hostname, sizeof(hostname));
-        VLOG(3) << "try to open for writing *****"<<checkpoint_file<<" 
"<<string(hostname);
-
-      FILE *tmp_file = fopen(checkpoint_file.c_str(), "r");
-      if (tmp_file){//exists -> open to reading and writing
-        fclose(tmp_file);
-        auto cp = t->checkpoint_files();
-
-        if (FLAGS_restore_mode){//open in read mode to restore, then close
-          LogFile *file = new LogFile(checkpoint_file,"rw",0);
-          VLOG(3) << "Loaded table " << file->file_name();
-          int table_size = file->read_latest_table_size();
-          delete file;
-
-          double start=Now();
-          VLOG(3) << "Open checkpoint file to restore";
-          (*cp)[a.shard()] = new LogFile(checkpoint_file,"r",a.shard());
-          t->Restore(a.shard());
-          delete (*cp)[a.shard()];
-          double end=Now();
-          LOG(ERROR)<<"restore time\t"<<end-start<< "\tfor\t"
-            <<table_size<<"\tthreshold\t"<<FLAGS_threshold;
-        }
-        char hostname[256];
-        gethostname(hostname, sizeof(hostname));
-        VLOG(3) << "open for writing *****"<<checkpoint_file<<" 
"<<string(hostname);
-
-
-
-        VLOG(3) << "Open checkpoint file for writing";
-        (*cp)[a.shard()] = new LogFile(checkpoint_file,"a",a.shard());
-      }
-      else{// not exist -> open to writing first time
-        auto cp = t->checkpoint_files();
-        (*cp)[a.shard()] = new LogFile(checkpoint_file,"w",a.shard());
-        VLOG(3) << "Added to new checkpoint files for shard "<< a.shard();
-      }
-
-    }
-
-
-  }
-  EmptyMessage empty;
-  mpi->Send(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT_DONE, empty);
-  VLOG(3)<<"finish handle shard assignment **";
-
-}
-
-
-int main(int argc, char **argv) {
-       FLAGS_logtostderr = 1;
-       int provided;
-       MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
-       google::InitGoogleLogging(argv[0]);
-       gflags::ParseCommandLineFlags(&argc, &argv, true);
-
-       context = GlobalContext::Get(FLAGS_system_conf);
-       network = NetworkThread::Get();
-
-       ModelProto model;
-       ReadProtoFromTextFile(FLAGS_model_conf.c_str(), &model);
-
-       create_mem_table(0,context->num_table_servers());
-
-  vector<int> tuple_size{37448736, 16777216, 4096000, 1327104, 884736, 884736, 
614400,14112,4096,4096,1000,384,384,256,256,96};
-  /*
-  vector<int> tuples;
-  for(int i=0;i<3;i++){
-    for(int j=0;j<FLAGS_workers;j++)
-      tuples.push_back(tuple_size[i]/FLAGS_workers);
-  }
-  for(int i=3;i<tuple_size.size();i++)
-    tuples.push_back(tuple_size[i]);
-    */
-
-       if (context->AmICoordinator()){
-               VLOG(3) << "Coordinator process rank = " << 
NetworkThread::Get()->id();
-               coordinator_assign_tables(0);
-               coordinator_load_data(tuple_size);
-
-               network->barrier();
-       }
-       else{
-               if (context->AmITableServer()){
-                       worker_table_init();
-                       HandleShardAssignment();
-                       network->barrier();
-               }
-               else{
-                       VLOG(3) << "Inside worker, waiting for assignemtn";
-                       HandleShardAssignment();
-                       network->barrier();
-      if(!FLAGS_restore_mode)
-        worker_test_data(tuple_size);
-               }
-       }
-       shutdown();
-
-
-       VLOG(3) << "Done ...";
-       return 0;
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_core.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_core.cc b/src/test/dist_test/test_core.cc
deleted file mode 100644
index 35d589b..0000000
--- a/src/test/dist_test/test_core.cc
+++ /dev/null
@@ -1,192 +0,0 @@
-//  Copyright © 2014 Anh Dinh. All Rights Reserved.
-
-
-
-#include "core/global-table.h"
-#include "core/common.h"
-#include "core/disk-table.h"
-#include "core/table.h"
-#include "core/table_server.h"
-#include "utils/global_context.h"
-#include <gflags/gflags.h>
-#include "proto/model.pb.h"
-#include "worker.h"
-#include "coordinator.h"
-#include "model_controller/myacc.h"
-#include <cmath>
-
-using namespace lapis;
-
-DEFINE_bool(sync_update, false, "Synchronous put/update queue");
-DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
-DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
-DEFINE_int32(num_keys,10,"");
-
-typedef map<int, GlobalTable*> Map;
-Map tables;
-shared_ptr<NetworkThread> network;
-shared_ptr<GlobalContext> context;
-std::vector<ServerState*> server_states;
-TableServer *table_server;
-
-void create_mem_table(int id, int num_shards){
-
-       TableDescriptor *info = new TableDescriptor(id, num_shards);
-         info->key_marshal = new Marshal<int>();
-         info->value_marshal = new Marshal<int>();
-         info->sharder = new Sharding::Mod;
-         info->accum = new TestUpdater();
-         info->partition_factory = new typename SparseTable<int, int>::Factory;
-         auto table=new TypedGlobalTable<int, int>();
-         table->Init(info);
-         tables[id] = table;
-}
-
-void coordinator_assign_tables(int id){
-       for (int i = 0; i < context->num_processes()-1; ++i) {
-           RegisterWorkerRequest req;
-           int src = 0;
-           network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src);
-           //  adding memory server.
-           if (context->IsTableServer(i)) {
-             server_states.push_back(new ServerState(i));
-           }
-         }
-         LOG(INFO) << " All servers registered and started up. Ready to go";
-         //  set itself as the current worker for the table
-         tables[id]->worker_id_ = network->id();
-
-         // memory servers are specified in global context. Round-robin 
assignment
-
-           VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< 
id;
-
-           int server_idx = 0;
-           for (int shard = 0; shard < tables[id]->num_shards(); ++shard) {
-             ServerState &server = *server_states[server_idx];
-             LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server "
-                       <<server_states[server_idx]->server_id;
-
-             // TODO(Anh) may overwrite this field if #shards>#table_servers
-             server.shard_id = shard;
-             server.local_shards.insert(new TaskId(id, shard));
-             server_idx = (server_idx + 1) % server_states.size();
-           }
-
-         VLOG(3)<<"table assignment";
-         //  then send table assignment
-         ShardAssignmentRequest req;
-         for (size_t i = 0; i < server_states.size(); ++i) {
-           ServerState &server = *server_states[i];
-           for (auto * task: server.local_shards) {
-             ShardAssignment *s  = req.add_assign();
-             s->set_new_worker(server.server_id);
-             s->set_table(task->table);
-             s->set_shard(task->shard);
-             //  update local tables
-             CHECK(tables.find(task->table)!=tables.end());
-             GlobalTable *t = tables.at(task->table);
-             t->get_partition_info(task->shard)->owner = server.server_id;
-             delete task;
-           }
-         }
-         VLOG(3)<<"finish table assignment, req size "<<req.assign_size();
-         network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, 
MTYPE_SHARD_ASSIGNMENT_DONE, req);
-         VLOG(3)<<"finish table server init";
-}
-
-void worker_table_init(){
-       table_server = new TableServer();
-       table_server->StartTableServer(tables);
-       VLOG(3) << "done starting table server";
-}
-
-
-void coordinator_load_data(){
-       auto table = static_cast<TypedGlobalTable<int,int>*>(tables[0]);
-       for (int i = 1; i<=FLAGS_num_keys; i++){
-               table->put(i,i);
-       }
-       VLOG(3) << "Loaded data successfully ...";
-}
-
-void worker_test_data(){
-       auto table = static_cast<TypedGlobalTable<int,int>*>(tables[0]);
-       for (int i=1; i<=FLAGS_num_keys; i++)
-               VLOG(3) << StringPrintf("Worker %d got (%d,%d)", 
NetworkThread::Get()->id(), i, table->get(i));
-
-
-       for (int j = 0; j < 2; j++) {
-               for (int i = 1; i <= FLAGS_num_keys; i++)
-                       table->update(i, i);
-
-               for (int i = 1; i <= FLAGS_num_keys; i++)
-                       VLOG(3)
-                                       << StringPrintf("Worker %d got (%d,%d)",
-                                                       
NetworkThread::Get()->id(), i, table->get(i));
-       }
-/*
-       for (int i = 1; i <= FLAGS_num_keys; i++)
-                               VLOG(3)
-                                               << StringPrintf("Worker %d got 
(%d,%d)",
-
-                                                       
NetworkThread::Get()->id(), i, table->get(i));
-*/
-}
-
-void shutdown(){
-       if (context->AmICoordinator()){
-               VLOG(3) << "Coordinator is shutting down ...";
-               EmptyMessage msg;
-               for (int i=0; i<context->num_processes()-1; i++)
-                       network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg);
-                EmptyMessage shutdown_msg;
-                 for (int i = 0; i < network->size() - 1; i++) {
-                   network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg);
-                 }
-                 network->Flush();
-                 network->Shutdown();
-       }
-       else{
-               VLOG(3) << "Worker " << network->id() << " is shutting down 
...";
-         network->Flush();
-         VLOG(3) << "Done flushing the network thread";
-         network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END, 
EmptyMessage());
-         EmptyMessage msg;
-         network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, 
&msg);
-         VLOG(3) << "Worker received MTYPE_WORKER_SHUTDOWN";
-         table_server->ShutdownTableServer();
-         VLOG(3) << "Flushing node " << network->id();
-         network->Shutdown();
-       }
-}
-
-
-int main(int argc, char **argv) {
-       FLAGS_logtostderr = 1;
-       google::InitGoogleLogging(argv[0]);
-       gflags::ParseCommandLineFlags(&argc, &argv, true);
-
-       context = GlobalContext::Get(FLAGS_system_conf, FLAGS_model_conf);
-       network = NetworkThread::Get();
-       VLOG(3) << "*** testing memory servers, with "
-                       << context->num_table_servers() << " servers";
-       create_mem_table(0,context->num_table_servers());
-
-       if (context->AmICoordinator()){
-               coordinator_assign_tables(0);
-               coordinator_load_data();
-               network->barrier();
-       }
-       else{
-               worker_table_init();
-               network->barrier();
-               VLOG(3) << "passed the barrier";
-               //Sleep(1);
-               worker_test_data();
-       }
-
-       shutdown();
-       return 0;
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_da.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_da.cc b/src/test/dist_test/test_da.cc
deleted file mode 100644
index 51aa93e..0000000
--- a/src/test/dist_test/test_da.cc
+++ /dev/null
@@ -1,700 +0,0 @@
-#include <glog/logging.h>
-#include <mpi.h>
-#include <utility>
-#include <vector>
-
-#include "da/gary.h"
-#include "da/dary.h"
-#include "da/ary.h"
-
-
-using std::make_pair;
-using std::vector;
-void Debug() {
-  int i = 0;
-  char hostname[256];
-  gethostname(hostname, sizeof(hostname));
-  printf("PID %d on %s ready for attach\n", getpid(), hostname);
-  fflush(stdout);
-  while (0 == i)
-    sleep(5);
-}
-
-
-
-void TestPar(int pdim, int rank){
-  lapis::DAry a1, a2;
-  lapis::DAry a3, a4;
-  vector<lapis::Range> slice{make_pair(0,4), make_pair(0,8)};
-  a1.SetShape({4,8});
-  a2.SetShape({4,8});
-  a1.Setup(pdim);
-  a2.Setup(pdim);
-  a1.Random();
-  a2.Random();
-  ARMCI_Barrier();
-
-
-  if(rank==0){
-    //Debug();
-    LOG(ERROR)<<"test simple partition along "<< pdim<<" dim";
-    a3=a1.Fetch(slice);
-    a4=a2.Fetch(slice);
-    LOG(ERROR)<<"fetch a";
-    LOG(ERROR)<<a3.ToString();
-    LOG(ERROR)<<"fetch b";
-    LOG(ERROR)<<a4.ToString();
-    a3.Add(a4);
-    LOG(ERROR)<<"a<- a+b";
-    LOG(ERROR)<<a3.ToString();
-  }
-  ARMCI_Barrier();
-  a1.Add(a2);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a1.Fetch(slice);
-    LOG(ERROR)<<"add then fetch";
-    LOG(ERROR)<<a5.ToString();
-  }
-}
-
-
-
-void TestMixedParElt(int pa, int pb, int pc, int rank){
-  LOG(ERROR)<<" p dim for a,b,c is "<<pa<<" "<<pb<<" "<<pc;
-  vector<lapis::Range> slice{make_pair(0,3),make_pair(0,6), make_pair(0,2)};
-  lapis::DAry a1, a2, a3;
-  a1.SetShape({3,6,2});
-  a2.SetShape({3,6,2});
-  a3.SetShape({3,6,2});
-  a1.Setup(pa);
-  a2.Setup(pb);
-  a3.Setup(pc);
-  a1.Random();
-  a2.Random();
-  a3.Random();
-
-  ARMCI_Barrier();
-  if(rank==0){
-    LOG(ERROR)<<"test elementwise ops with mixed partition";
-    lapis::DAry a5, a4;
-//    Debug();
-    a5=a1.Fetch(slice);
-    a4=a2.Fetch(slice);
-    LOG(ERROR)<<"fetch a";
-    LOG(ERROR)<<a5.ToString();
-    LOG(ERROR)<<"fetch b";
-    LOG(ERROR)<<a4.ToString();
-    a5.Copy(a4);
-    LOG(ERROR)<<"fetch op a.Copy(b)";
-    LOG(ERROR)<<a5.ToString();
-  }
-  ARMCI_Barrier();
-  a1.Copy(a2);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a1.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.Copy(b)";
-    LOG(ERROR)<<a5.ToString();
-  }
-
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a8, a4, a5({3,6,2});
-    //Debug();
-    a8=a1.Fetch(slice);
-    a4=a2.Fetch(slice);
-    LOG(ERROR)<<"fetch a";
-    LOG(ERROR)<<a8.ToString();
-    LOG(ERROR)<<"fetch b";
-    LOG(ERROR)<<a4.ToString();
-    a5.Mult(a8,a4);
-    LOG(ERROR)<<"fetch op c.mult(a,b)";
-    LOG(ERROR)<<a5.ToString();
-  }
-  ARMCI_Barrier();
-  a3.Mult(a1,a2);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.Mult(b,c)";
-    LOG(ERROR)<<a5.ToString();
-  }
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a8, a4, a5({3,6,2});
-    //Debug();
-    a8=a1.Fetch(slice);
-    a4=a2.Fetch(slice);
-    LOG(ERROR)<<"fetch a";
-    LOG(ERROR)<<a8.ToString();
-    LOG(ERROR)<<"fetch b";
-    LOG(ERROR)<<a4.ToString();
-    a5.Div(a8,a4);
-    LOG(ERROR)<<"fetch op c.div(a,b)";
-    LOG(ERROR)<<a5.ToString();
-  }
-  ARMCI_Barrier();
-  a3.Div(a1,a2);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.div(b,c)";
-    LOG(ERROR)<<a5.ToString();
-  }
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a8, a4, a5({3,6,2});
-    //Debug();
-    a8=a1.Fetch(slice);
-    LOG(ERROR)<<"fetch a";
-    LOG(ERROR)<<a8.ToString();
-    a5.Mult(a8, 3.0);
-    LOG(ERROR)<<"fetch op c.mult(a,3)";
-    LOG(ERROR)<<a5.ToString();
-  }
-  ARMCI_Barrier();
-  a3.Mult(a1,3.0);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.mult(b,3)";
-    LOG(ERROR)<<a5.ToString();
-  }
-
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a8, a4, a5({3,6,2});
-    //Debug();
-    a8=a1.Fetch(slice);
-    LOG(ERROR)<<"fetch a";
-    LOG(ERROR)<<a8.ToString();
-    a5.Square(a8);
-    LOG(ERROR)<<"fetch op c.square(a)";
-    LOG(ERROR)<<a5.ToString();
-  }
-  ARMCI_Barrier();
-  a3.Square(a1);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.sqaure(b)";
-    LOG(ERROR)<<a5.ToString();
-  }
-
-
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a8, a4, a5({3,6,2});
-    //Debug();
-    a8=a1.Fetch(slice);
-    LOG(ERROR)<<"fetch a";
-    LOG(ERROR)<<a8.ToString();
-    a5.Pow(a8,3.0);
-    LOG(ERROR)<<"fetch op c.pow(a, 3)";
-    LOG(ERROR)<<a5.ToString();
-  }
-  ARMCI_Barrier();
-  a3.Pow(a1,3.0);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.pow(b,3)";
-    LOG(ERROR)<<a5.ToString();
-  }
-
-
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  a3.SampleUniform(0.0,3.0);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.uniform(0,3)";
-    LOG(ERROR)<<a5.ToString();
-  }
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  a3.SampleGaussian(0.0,1.0);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.norm(0,1)";
-    LOG(ERROR)<<a5.ToString();
-  }
-
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  a3.Fill(1.43);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch a.fill(1.43)";
-    LOG(ERROR)<<a5.ToString();
-  }
-
-
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  a1.Random();
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a8, a4, a5({3,6,2});
-    a4=a1.Fetch(slice);
-    a5.Threshold(a4,0.3);
-    LOG(ERROR)<<"fetch op b=threshold(a,0.3)";
-    LOG(ERROR)<<a4.ToString();
-    LOG(ERROR)<<a5.ToString();
-  }
-
-  ARMCI_Barrier();
-  a3.Threshold(a1, .30f);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch b=threshold(a,0.3)";
-    LOG(ERROR)<<a5.ToString();
-  }
-
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  a1.Random();
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a8, a4, a5({3,6,2});
-    a4=a1.Fetch(slice);
-    a5.Max(a4,0.3);
-    LOG(ERROR)<<"fetch op b=max(a,0.3)";
-    LOG(ERROR)<<a4.ToString();
-    LOG(ERROR)<<a5.ToString();
-  }
-
-  ARMCI_Barrier();
-  a3.Max(a1, .30f);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch b=max(a,0.3)";
-    LOG(ERROR)<<a5.ToString();
-  }
-
-
-//////////////////////////////////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry a6, a4, a5({3,6,2});
-    a6=a1.Fetch(slice);
-    a4=a2.Fetch(slice);
-    a5.Map([](float a, float b) {return a+2*b;}, a6,a4);
-    LOG(ERROR)<<"fetch op b=map(a+2b)";
-    LOG(ERROR)<<a6.ToString();
-    LOG(ERROR)<<a4.ToString();
-    LOG(ERROR)<<a5.ToString();
-  }
-  ARMCI_Barrier();
-  a3.Map([](float a, float b) {return a+2*b;}, a1,a2);
-  if(rank==0){
-    lapis::DAry a5;
-    a5=a3.Fetch(slice);
-    LOG(ERROR)<<"op fetch b=map(a+2b)";
-    LOG(ERROR)<<a5.ToString();
-  }
-  LOG(ERROR)<<"finish elementwise ops";
-}
-
-
-void TestLargeDot(int pa, int pb, int pc, int rank){
-  if(rank==0){
-    LOG(ERROR)<<"test Dot, partition for a, b, c : "
-      << pa<<" "<<pb<<" "<<pc<<" dim";
-  }
-
-  double t1, t2, t3;
-  t1=MPI_Wtime();
-  lapis::DAry a,b,c;
-  a.SetShape({256,9216});
-  b.SetShape({9216,4096});
-  c.SetShape({256,4096});
-  a.Setup(pa);
-  b.Setup(pb);
-  c.Setup(pc);
-  a.Random();
-  b.Random();
-  c.Random();
-  ARMCI_Barrier();
-  t2=MPI_Wtime();
-  c.Dot(a,b);
-  t3=MPI_Wtime();
-  ARMCI_Barrier();
-  LOG(ERROR)<<"setup time: "<<t2-t1<<" dot time: "
-    <<t3-t2<<" wait time:"<<MPI_Wtime()-t3;
-}
-
-void TestDot(int pa, int pb, int pc, int rank){
-  vector<lapis::Range> slicea{make_pair(0,4), make_pair(0,8)};
-  vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)};
-  vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)};
-  lapis::DAry a,b,c;
-  a.SetShape({4,8});
-  b.SetShape({8,4});
-  c.SetShape({4,4});
-  a.Setup(pa);
-  b.Setup(pb);
-  c.Setup(pc);
-  a.Random();
-  b.Random();
-  c.Random();
-  //////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    LOG(ERROR)<<"test Dot, partition for a, b, c : "
-      << pa<<" "<<pb<<" "<<pc<<" dim";
-    LOG(ERROR)<<"c=a*b";
-    lapis::DAry x,y,z;
-    x=a.Fetch(slicea);
-    y=b.Fetch(sliceb);
-    z=c.Fetch(slicec);
-    z.Dot(x,y);
-    LOG(ERROR)<<"fetch dot ";
-    LOG(ERROR)<<z.ToString();
-  }
-  ARMCI_Barrier();
-  //Debug();
-  c.Dot(a,b);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry z;
-    z=c.Fetch(slicec);
-    LOG(ERROR)<<"dot fetch";
-    LOG(ERROR)<<z.ToString();
-  }
-  /////////////////////////////
-  ARMCI_Barrier();
-
-  if(rank==0){
-    LOG(ERROR)<<"a=c*b^T";
-    lapis::DAry x,y,z;
-    x=a.Fetch(slicea);
-    y=b.Fetch(sliceb);
-    z=c.Fetch(slicec);
-    x.Dot(z,y, false, true);
-    LOG(ERROR)<<"fetch dot ";
-    LOG(ERROR)<<x.ToString();
-  }
-  ARMCI_Barrier();
-  //Debug();
-  a.Dot(c,b, false, true);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry z;
-    z=a.Fetch(slicea);
-    LOG(ERROR)<<"dot fetch";
-    LOG(ERROR)<<z.ToString();
-  }
-
-  /////////////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    LOG(ERROR)<<"b=a^T*c";
-    lapis::DAry x,y,z;
-    x=a.Fetch(slicea);
-    y=b.Fetch(sliceb);
-    z=c.Fetch(slicec);
-    y.Dot(x,z, true, false);
-    LOG(ERROR)<<"fetch dot ";
-    LOG(ERROR)<<y.ToString();
-  }
-  ARMCI_Barrier();
-  //Debug();
-  b.Dot(a,c, true, false);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry z;
-    z=b.Fetch(sliceb);
-    LOG(ERROR)<<"dot fetch";
-    LOG(ERROR)<<z.ToString();
-  }
-  ARMCI_Barrier();
-  /////////////////////////////
-  ARMCI_Barrier();
-  if(rank==0){
-    LOG(ERROR)<<"b=a^T*c^T";
-    lapis::DAry x,y,z;
-    x=a.Fetch(slicea);
-    y=b.Fetch(sliceb);
-    z=c.Fetch(slicec);
-    y.Dot(x,z, true, true);
-    LOG(ERROR)<<"fetch dot ";
-    LOG(ERROR)<<y.ToString();
-  }
-  ARMCI_Barrier();
-  //Debug();
-  b.Dot(a,c, true, true);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry z;
-    z=b.Fetch(sliceb);
-    LOG(ERROR)<<"dot fetch";
-    LOG(ERROR)<<z.ToString();
-  }
-  ARMCI_Barrier();
-}
-
-
-void TestSubarray(int pa, int pb, int pc, int rank){
-  vector<lapis::Range> slicea{make_pair(0,4), make_pair(0,8)};
-  vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)};
-  vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)};
-  vector<lapis::Range> slice{make_pair(0,4)};
-  lapis::DAry a,b,c;
-  a.SetShape({4});
-  b.SetShape({8,4});
-  c.SetShape({4,4});
-  a.Setup(pa);
-  b.Setup(pb);
-  c.Setup(pc);
-  b.Random();
-  c.Random();
-
-  //Debug();
-  lapis::DAry sb=b[2];
-  lapis::DAry sc=c[3];
-
-  ARMCI_Barrier();
-  if(rank==0){
-    LOG(ERROR)<<"test subary, partition for a, b, c : "
-      << pa<<" "<<pb<<" "<<pc<<" dim";
-    lapis::DAry y,z, x({4});
-    LOG(ERROR)<<"fetch full b, c";
-    y=b.Fetch(sliceb);
-    z=c.Fetch(slicec);
-    LOG(ERROR)<<y.ToString();
-    LOG(ERROR)<<z.ToString();
-    LOG(ERROR)<<"fetch sub, sb[2], sc[3]";
-    y=sb.Fetch(slice);
-    z=sc.Fetch(slice);
-    LOG(ERROR)<<y.ToString();
-    LOG(ERROR)<<z.ToString();
-  }
-  ARMCI_Barrier();
-  a.Add(sb,sc);
-  ARMCI_Barrier();
-  //Debug();
-  if(rank==0){
-    lapis::DAry z;
-    z=a.Fetch(slice);
-    LOG(ERROR)<<"sub add fetch, sb[2]+sc[3]";
-    LOG(ERROR)<<z.ToString();
-  }
-}
-
-void TestReshape(int pa, int pb, int pc, int rank){
-  vector<lapis::Range> sliceb3{make_pair(0,2),make_pair(0,4), make_pair(0,4)};
-  vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)};
-  vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)};
-  vector<lapis::Range> slicea{make_pair(0,4)};
-  lapis::DAry a,b,c,b3,b2,b1;
-  a.SetShape({4});
-  b.SetShape({8,4});
-  c.SetShape({4,4});
-  a.Setup(pa);
-  b.Setup(pb);
-  c.Setup(pc);
-  b.Random();
-  c.Random();
-
-  b3=b.Reshape({2,4,4});
-  //Debug() ;
-  b2=b3[1];
-  if(rank==0){
-    LOG(ERROR)<<"test reshape+subary, partition for a, b, c : "
-      << pa<<" "<<pb<<" "<<pc<<" dim";
-    lapis::DAry y,z,x;
-    LOG(ERROR)<<"fetch b, b2, c";
-    y=b.Fetch(sliceb);
-    z=b2.Fetch(slicec);
-    x=c.Fetch(slicec);
-    LOG(ERROR)<<y.ToString();
-    LOG(ERROR)<<z.ToString();
-    LOG(ERROR)<<x.ToString();
-    LOG(ERROR)<<"fetch sub, b2+c";
-    z.Add(x);
-    LOG(ERROR)<<z.ToString();
-  }
-
-  ARMCI_Barrier();
-  c.Add(b2);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry y,z,x;
-    x=c.Fetch(slicec);
-    LOG(ERROR)<<"sub add,fetch c+b2";
-    LOG(ERROR)<<x.ToString();
-  }
-  ARMCI_Barrier();
-  b2.Add(c);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry y,z,x;
-    x=b2.Fetch(slicec);
-    LOG(ERROR)<<"sub add,fetch b2+c";
-    LOG(ERROR)<<x.ToString();
-  }
-  ARMCI_Barrier();
-  b1=b2[2];
-  if(rank==0){
-    lapis::DAry y,z,x;
-    x=b1.Fetch(slicea);
-    LOG(ERROR)<<"fetch b1";
-    LOG(ERROR)<<x.ToString();
-  }
-
-  a.Add(b1);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry y,z,x;
-    x=a.Fetch(slicea);
-    LOG(ERROR)<<"add fetch a+b1";
-    LOG(ERROR)<<x.ToString();
-  }
-  ARMCI_Barrier();
-  b1.Add(a);
-  ARMCI_Barrier();
-  if(rank==0){
-    lapis::DAry y,z,x;
-    x=b1.Fetch(slicea);
-    LOG(ERROR)<<"add fetch b1+a";
-    LOG(ERROR)<<x.ToString();
-  }
-
-  ARMCI_Barrier();
-  {
-    lapis::DAry b3=b.Reshape({4,2,4});
-    lapis::DAry a;
-    a.SetShape({2,4});
-    a.Setup(pa);
-    a.Random();
-    lapis::DAry b1=b3[1];
-    lapis::DAry b2=b3[3];
-    lapis::DAry c;
-    c.SetShape({2,2});
-    c.Setup(pc);
-    ARMCI_Barrier();
-    c.Dot(a,b2,false, true);
-    ARMCI_Barrier();
-    if(rank==0){
-      lapis::DAry x,y,z,zz({2,2});
-      y=b3.Fetch({make_pair(0,4), make_pair(0,2), make_pair(0,4)});
-      x=a.Fetch({make_pair(0,2), make_pair(0,4)});
-      LOG(ERROR)<<"fetch b,a";
-      LOG(ERROR)<<y.ToString();
-      LOG(ERROR)<<x.ToString();
-      z=y[3];
-      zz.Dot(x,z,false, true);
-      LOG(ERROR)<<"fetch dot c=a*b[3]^T";
-      LOG(ERROR)<<zz.ToString();
-
-      x=a.Fetch({make_pair(0,2), make_pair(0,4)});
-      y=b2.Fetch({make_pair(0,2), make_pair(0,4)});
-      z=c.Fetch({make_pair(0,2), make_pair(0,2)});
-      LOG(ERROR)<<"op fetch c=a*b[3]^T";
-      LOG(ERROR)<<x.ToString();
-      LOG(ERROR)<<y.ToString();
-      LOG(ERROR)<<z.ToString();
-
-    }
-    ARMCI_Barrier();
-  }
-}
-
-
-
-int main(int argc, char**argv){
- // MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
-  MPI_Init(&argc, &argv);
-  int rank, nprocs;
-  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
-  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
-  vector<int> procs;
-  for (int i = 0; i < nprocs; i++) {
-    procs.push_back(i);
-  }
-  //Debug();
-  lapis::GAry::Init(rank,procs);
-  google::InitGoogleLogging(argv[0]);
-  /*
-  if(nprocs%3==0){
-    TestMixedParElt(0,0,0,rank);
-    TestMixedParElt(0,0,1,rank);
-    TestMixedParElt(0,1,0,rank);
-    TestMixedParElt(1,0,0,rank);
-    TestMixedParElt(1,1,0,rank);
-    TestMixedParElt(1,1,1,rank);
-    TestMixedParElt(0,1,1,rank);
-  }
-  if(nprocs%2==0){
-    TestMixedParElt(1,1,1,rank);
-    TestMixedParElt(1,2,1,rank);
-    TestMixedParElt(2,1,1,rank);
-    TestMixedParElt(1,1,2,rank);
-    TestMixedParElt(2,2,2,rank);
-  }
-  TestDot(0,0,0,rank);
-  TestDot(0,0,1,rank);
-  TestDot(0,1,0,rank);
-  TestDot(0,1,1,rank);
-  TestDot(1,0,0,rank);
-  TestDot(1,0,1,rank);
-  TestDot(1,1,0,rank);
-  TestDot(1,1,1,rank);
-
-  TestPar(0, rank);
-  TestPar(1, rank);
-  */
-  double start, end;
-  start=MPI_Wtime();
-  TestLargeDot(0,0,0,rank);
-  TestLargeDot(0,0,1,rank);
-  TestLargeDot(0,1,0,rank);
-  TestLargeDot(0,1,1,rank);
-  TestLargeDot(1,0,0,rank);
-  TestLargeDot(1,0,1,rank);
-  TestLargeDot(1,1,0,rank);
-  TestLargeDot(1,1,1,rank);
-  end=MPI_Wtime();
-  if(rank==0)
-    LOG(ERROR)<<"dot time for 256*4k 4k*4k matrix, "<<end-start;
-  /*
-  TestSubarray(0,0,0,rank);
-  TestSubarray(0,0,1,rank);
-  TestSubarray(0,1,0,rank);
-  TestSubarray(0,1,1,rank);
-  TestReshape(0,0,0,rank);
-  TestReshape(0,0,1,rank);
-  TestReshape(0,1,0,rank);
-  TestReshape(0,1,1,rank);
-  */
-
-  LOG(ERROR)<<"finish";
-  lapis::GAry::Finalize();
-  MPI_Finalize();
-  return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_dary.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_dary.cc b/src/test/dist_test/test_dary.cc
deleted file mode 100644
index ce605e6..0000000
--- a/src/test/dist_test/test_dary.cc
+++ /dev/null
@@ -1,85 +0,0 @@
-#include <iostream>
-#include "darray/dary.h"
-#include "utils/timer.h"
-
-
-int main() {
-  lapis::DAry x({1000000});
-  lapis::DAry y({1000000});
-  x.Random();
-  y.Random();
-  lapis::Timer t;
-  for(int i=0;i<100;i++){
-    float *dptrx=x.dptr();
-    float *dptry=y.dptr();
-    for(int k=0;k<10000;k++)
-      dptrx[k]*=dptry[k];
-  }
-  std::cout<<"arymath: "<<t.elapsed()/10<<std::endl;
-  lapis::DAry m({1000000});
-  lapis::DAry n({1000000});
-  m.Random();
-  n.Random();
-  t.Reset();
-  for(int i=0;i<100;i++)
-    m.Mult(m,n);
-  std::cout<<"arymath: "<<t.elapsed()/10<<std::endl;
-
-
-  lapis::DAry a({2,2});
-  lapis::DAry b,c;
-  b.InitLike(a);
-  c.InitLike(a);
-  a.Random();
-  b.Random();
-  std::cout<<a.ToString()<<std::endl;
-  std::cout<<b.ToString()<<std::endl;
-  c.Dot(a,b);
-  std::cout<<"c=a.b"<<c.ToString()<<std::endl;
-  a.Add(b);
-  std::cout<<"a=a+b"<<a.ToString()<<std::endl;
-  a.Mult(a,b);
-  std::cout<<"a=a*b"<<a.ToString()<<std::endl;
-  a.Minus(a,b);
-  std::cout<<"a=a-b"<<a.ToString()<<std::endl;
-
-  c.Random();
-  std::cout<<"random c "<<c.ToString()<<std::endl;
-  a.Threshold(c, 0.3);
-  std::cout<<"a=threshold(c,0.3) "<<a.ToString()<<std::endl;
-
-  a.Pow(c, 0.4);
-  std::cout<<"a=Pow(c,0.4) "<<a.ToString()<<std::endl;
-
-  c.Set(0.5);
-  std::cout<<"c=set(0.5) "<<c.ToString()<<std::endl;
-  a.Square(c);
-  std::cout<<"a=square(c) "<<a.ToString()<<std::endl;
-
-  c.Copy(a);
-  std::cout<<"c=Copy(a) "<<c.ToString()<<std::endl;
-
-  lapis::DAry d({2});
-  d.SumRow(b);
-  std::cout<<"d=SumRow(b) "<<d.ToString()<<std::endl;
-  d.SumCol(b);
-  std::cout<<"d=SumCol(b) "<<d.ToString()<<std::endl;
-  b.AddRow(d);
-  std::cout<<"b=AddRow(d) "<<b.ToString()<<std::endl;
-  b.AddCol(d);
-  std::cout<<"b=AddCol(d) "<<b.ToString()<<std::endl;
-
-  std::cout<<"max(b) "<<b.Max()<<std::endl;
-  std::cout<<"Sum(b) "<<b.Sum()<<std::endl;
-
-  lapis::DAry e({3,3,3});
-  e.SampleGaussian(0.0f,1.0f);
-  std::cout<<"Gaussain e "<<e.ToString()<<std::endl;
-
-  lapis::DAry f({9});
-  f.Sum(e, 0, {0,2});
-  std::cout<<"f.sum  "<<f.ToString()<<std::endl;
-
-  return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_disk_table.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_disk_table.cc 
b/src/test/dist_test/test_disk_table.cc
deleted file mode 100644
index 99987bb..0000000
--- a/src/test/dist_test/test_disk_table.cc
+++ /dev/null
@@ -1,188 +0,0 @@
-//  Copyright © 2014 Anh Dinh. All Rights Reserved.
-//  main class for testing distributed memory layer
-//
-//  the command to run this should be:
-//             mpirun -hostfile <host> -bycore -nooversubscribe
-//                             -n <num_servers> test -sync_update
-
-
-#include "core/global-table.h"
-#include "core/common.h"
-#include "core/disk-table.h"
-#include "core/table.h"
-#include "core/table_server.h"
-#include "utils/global_context.h"
-#include <gflags/gflags.h>
-#include "proto/model.pb.h"
-#include "worker.h"
-#include <cmath>
-
-DEFINE_int32(record_size,100, "# elements per float vector");
-DECLARE_int32(block_size);
-DEFINE_int32(table_size, 1000, "# records per table");
-DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
-DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
-DEFINE_bool(is_testing_put,true, "data put vs. data get");
-DECLARE_int32(debug_index);
-DECLARE_int32(table_buffer);
-using namespace lapis;
-
-typedef map<int, GlobalTable*> Map;
-Map tables;
-
-//  put random message to the pointers
-void create_random_message(FloatVector* message, const int count){
-       for (int i=0; i<FLAGS_record_size; i++){
-               message->add_data(count*FLAGS_record_size+i);
-       }
-}
-
-void create_disk_table(int id){
-       DiskTableDescriptor *info = new DiskTableDescriptor(id, "disk_test",
-                       FLAGS_block_size);
-       info->key_marshal = new Marshal<int>();
-       info->value_marshal = new Marshal<FloatVector>();
-       tables[id] = new TypedDiskTable<int,FloatVector>(info);
-}
-
-
-//  if testing put, write and send data. Else do nothing
-void run_coordinator(shared_ptr<NetworkThread> network, int tid){
-       // wait for wokers to be up
-       RegisterWorkerRequest req;
-       for (int i=0; i<network->size()-1; i++)
-               network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req);
-
-       // put data in
-       TypedDiskTable<int, FloatVector>* table = 
static_cast<TypedDiskTable<int,
-                       FloatVector>*>(tables[tid]);
-
-       //  if testing put()
-       if (FLAGS_is_testing_put) {
-               int count = 0;
-               for (int i = 0; i < FLAGS_table_size; i++) {
-                       FloatVector message;
-                       create_random_message(&message, i);
-                       table->put(i, message);
-                       count += message.ByteSize();
-               }
-               table->finish_put();
-       }
-
-       VLOG(3) << "Coordinator about to shut down";
-       for (int i=0; i<network->size()-1; i++){
-               EmptyMessage end_msg;
-               network->Read(i,MTYPE_WORKER_END, &end_msg);
-       }
-
-       EmptyMessage shutdown_msg;
-       for (int i = 0; i < network->size() - 1; i++) {
-               network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg);
-       }
-       network->Flush();
-       network->Shutdown();
-       table->PrintStats();
-
-       if (FLAGS_is_testing_put) {
-               int sub_blocks = ceil(((double) FLAGS_table_size / 
FLAGS_table_buffer));
-               CHECK_EQ(table->stats()["total sub block sent"], sub_blocks);
-               CHECK_EQ(table->stats()["total record sent"], FLAGS_table_size);
-               VLOG(3) << "test coordinator sending: successful";
-       }
-
-}
-
-//  if testing put(), do nothing. Else read() until done()
-void run_worker(shared_ptr<NetworkThread> network, int tid){
-       TableServer* ts = new TableServer();
-       ts->StartTableServer(tables);
-
-       // put data in
-       TypedDiskTable<int, FloatVector>* table = 
static_cast<TypedDiskTable<int,
-                       FloatVector>*>(tables[tid]);
-       double total_read = 0;
-       if (!FLAGS_is_testing_put){
-               VLOG(3) << "testing read from table ...";
-               table->Load();
-               while (!table->done()){
-                       int k;
-                       FloatVector v;
-                       table->get(&k,&v);
-                       table->Next();
-                       total_read++;
-               }
-
-               int k;
-               FloatVector v;
-               table->get(&k, &v);
-               total_read++;
-       }
-
-       int size = network->size();
-
-       network->Flush();
-       network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END,
-                       EmptyMessage());
-       EmptyMessage msg;
-
-       int src = 0;
-       network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, 
&msg,
-                       &src);
-       network->Flush();
-       network->Shutdown();
-
-       Stats stats =
-                       (static_cast<TypedDiskTable<int, 
FloatVector>*>(tables[0]))->stats();
-
-       if (FLAGS_is_testing_put) {
-               int sub_blocks = ceil(((double) FLAGS_table_size / 
FLAGS_table_buffer));
-               if (size == 2) {
-                       CHECK_EQ(stats["total sub block received"], sub_blocks);
-                       CHECK_EQ(stats["total record stored"], 
FLAGS_table_size);
-               }
-               VLOG(3) << "test table-server writing: successful";
-               VLOG(3) << "number of sub blocks = " << sub_blocks;
-               VLOG(3) << "total data stored = " << stats["total byte stored"];
-       }
-       else{
-               if (size==2)
-                       CHECK_EQ(stats["total record read"], FLAGS_table_size);
-               VLOG(3) << "test table-server reading: successful";
-               VLOG(3) << "read bandwidth = "
-                               << (stats["total byte read"]
-                                               / (stats["last byte read"] - 
stats["first byte read"]));
-               //VLOG(3) << "total number of record read = " << stats["total 
record read"];
-       }
-
-       network->PrintStats();
-       static_cast<TypedDiskTable<int, FloatVector>*>(tables[0])->PrintStats();
-}
-
-//  check all the records have been stored to disk
-int test_disk(int tid) {
-       // Init GlobalContext
-       auto gc = lapis::GlobalContext::Get(FLAGS_system_conf, 
FLAGS_model_conf);
-       //start network thread
-       shared_ptr<NetworkThread> network = NetworkThread::Get();
-
-       if (network->id() == network->size() - 1)
-               run_coordinator(network, tid);
-       else
-               run_worker(network,tid);
-       return 0;
-}
-
-// for debugging use
-//#ifndef FLAGS_v
-//  DEFINE_int32(v, 3, "vlog controller");
-//#endif
-
-int main(int argc, char **argv) {
-       FLAGS_logtostderr = 1;
-       google::InitGoogleLogging(argv[0]);
-       gflags::ParseCommandLineFlags(&argc, &argv, true);
-       create_disk_table(0);
-       return test_disk(0);
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_mnistlayer.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_mnistlayer.cc 
b/src/test/dist_test/test_mnistlayer.cc
deleted file mode 100644
index 882e121..0000000
--- a/src/test/dist_test/test_mnistlayer.cc
+++ /dev/null
@@ -1,165 +0,0 @@
-#include <gtest/gtest.h>
-#include <sys/stat.h>
-#include <cstdint>
-#include "opencv2/highgui/highgui.hpp"
-#include "opencv2/imgproc/imgproc.hpp"
-
-#include "model/layer.h"
-#include "proto/model.pb.h"
-#include "utils/shard.h"
-using namespace singa;
-TEST(MnistLayerTest, SingleScale){
-  LayerProto proto;
-  MnistProto *mnist=proto.mutable_mnist_param();
-  mnist->set_size(55);
-  MnistImageLayer layer;
-  layer.FromProto(proto);
-  cv::Mat image;
-  image=cv::imread("src/test/data/mnist.png", 0);
-  string pixel;
-  pixel.resize(image.rows*image.cols);
-  for(int i=0,k=0;i<image.rows;i++)
-    for(int j=0; j<image.cols;j++)
-      pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j));
-  Record rec;
-  rec.set_type(Record_Type_kMnist);
-  MnistRecord *mrec=rec.mutable_mnist();
-  mrec->set_pixel(pixel);
-  layer.Setup(1, rec, kNone);
-  layer.AddInputRecord(rec);
-
-  const vector<uint8_t>& dat=layer.Convert2Image(0);
-  int s=static_cast<int>(sqrt(dat.size()));
-  cv::Mat newimg(s,s,CV_8UC1);
-  int count=0;
-  for(int i=0,k=0;i<newimg.rows;i++)
-    for(int j=0; j<newimg.cols;j++){
-      count+=dat[k]>0;
-      newimg.at<uint8_t>(i,j)=dat[k++];
-    }
-  //LOG(ERROR)<<"image positive "<<count<<" size "<<s;
-  cv::imwrite("src/test/data/mnist_scale.png", newimg);
-}
-
-TEST(MnistLayerTest, SingleAffineTransform){
-  LayerProto proto;
-  MnistProto *mnist=proto.mutable_mnist_param();
-  mnist->set_beta(15);
-  mnist->set_gamma(16);
-  mnist->set_size(55);
-  MnistImageLayer layer;
-  layer.FromProto(proto);
-  cv::Mat image;
-  image=cv::imread("src/test/data/mnist.png", 0);
-  string pixel;
-  pixel.resize(image.rows*image.cols);
-  for(int i=0,k=0;i<image.rows;i++)
-    for(int j=0; j<image.cols;j++)
-      pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j));
-  Record rec;
-  rec.set_type(Record_Type_kMnist);
-  MnistRecord *mrec=rec.mutable_mnist();
-  mrec->set_pixel(pixel);
-  layer.Setup(1, rec, kNone);
-  layer.AddInputRecord(rec);
-
-  const vector<uint8_t>& dat=layer.Convert2Image(0);
-  int s=static_cast<int>(sqrt(dat.size()));
-  cv::Mat newimg(s,s,CV_8UC1);
-  int count=0;
-  for(int i=0,k=0;i<newimg.rows;i++)
-    for(int j=0; j<newimg.cols;j++){
-      count+=dat[k]>0;
-      newimg.at<uint8_t>(i,j)=dat[k++];
-    }
-  //LOG(ERROR)<<"image positive "<<count<<" size "<<s;
-
-  cv::imwrite("src/test/data/mnist_affine.png", newimg);
-}
-TEST(MnistLayerTest, SingleElasticDistortion){
-  LayerProto proto;
-  MnistProto *mnist=proto.mutable_mnist_param();
-  mnist->set_elastic_freq(1);
-  mnist->set_sigma(6);
-  mnist->set_alpha(36);
-  mnist->set_beta(15);
-  mnist->set_gamma(16);
-  mnist->set_size(55);
-  mnist->set_kernel(21);
-  MnistImageLayer layer;
-  layer.FromProto(proto);
-  cv::Mat image;
-  image=cv::imread("src/test/data/mnist.png", 0);
-  string pixel;
-  pixel.resize(image.rows*image.cols);
-  for(int i=0,k=0;i<image.rows;i++)
-    for(int j=0; j<image.cols;j++)
-      pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j));
-  Record rec;
-  rec.set_type(Record_Type_kMnist);
-  MnistRecord *mrec=rec.mutable_mnist();
-  mrec->set_pixel(pixel);
-  layer.Setup(1, rec, kNone);
-  layer.AddInputRecord(rec);
-
-  const vector<uint8_t>& dat=layer.Convert2Image(0);
-  int s=static_cast<int>(sqrt(dat.size()));
-  cv::Mat newimg(s,s,CV_8UC1);
-  int count=0;
-  for(int i=0,k=0;i<newimg.rows;i++)
-    for(int j=0; j<newimg.cols;j++){
-      count+=dat[k]>0;
-      newimg.at<uint8_t>(i,j)=dat[k++];
-    }
-  cv::imwrite("src/test/data/mnist_elastic.png", newimg);
-}
-TEST(MnistLayerTest, MultElasticDistortion){
-  LayerProto proto;
-  MnistProto *mnist=proto.mutable_mnist_param();
-  int kTotal=100;
-  int kSize=29;
-  mnist->set_elastic_freq(kTotal);
-  mnist->set_sigma(6);
-  mnist->set_alpha(36);
-  mnist->set_beta(15);
-  mnist->set_gamma(16);
-  mnist->set_size(kSize);
-  mnist->set_kernel(21);
-  MnistImageLayer layer;
-  layer.FromProto(proto);
-  vector<vector<int>> shapes{{kTotal, kSize,kSize}};
-  layer.Setup(shapes, kNone);
-  shard::Shard 
source("/data1/wangwei/singa/data/mnist/test/",shard::Shard::kRead);
-  int n=static_cast<int>(sqrt(kTotal));
-  cv::Mat origin(n*28,n*28, CV_8UC1);
-  char disp[1024];
-  for(int x=0;x<n;x++){
-    sprintf(disp+strlen(disp), "\n");
-    for(int y=0;y<n;y++){
-      Record rec;
-      string key;
-      CHECK(source.Next(&key, &rec));
-      const string pixel=rec.mnist().pixel();
-      cv::Mat img=origin(cv::Rect(y*28, x*28, 28, 28));
-      for(int i=0,k=0;i<28;i++)
-        for(int j=0;j<28;j++)
-          img.at<uint8_t>(i,j)=static_cast<uint8_t>(pixel[k++]);
-      layer.AddInputRecord(rec);
-      sprintf(disp+strlen(disp), "%d ", rec.mnist().label());
-    }
-  }
-  LOG(ERROR)<<disp;
-  cv::imwrite("src/test/data/mnist_big.png", origin);
-
-  cv::Mat output(n*kSize,n*kSize, CV_8UC1);
-  for(int i=0;i<kTotal;i++){
-    const vector<uint8_t>& dat=layer.Convert2Image(i);
-    int x=(i/n);
-    int y=i%n;
-    cv::Mat img=output(cv::Rect(y*kSize, x*kSize, kSize, kSize));
-    for(int i=0,k=0;i<kSize;i++)
-      for(int j=0;j<kSize;j++)
-        img.at<uint8_t>(i,j)=dat[k++];
-  }
-  cv::imwrite("src/test/data/mnist_bigout.png", output);
-}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_model.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_model.cc b/src/test/dist_test/test_model.cc
deleted file mode 100644
index c3f98b9..0000000
--- a/src/test/dist_test/test_model.cc
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright © 2014 Wei Wang. All Rights Reserved.
-// 2014-08-02 14:13
-#include <glog/logging.h>
-#include <gflags/gflags.h>
-
-
-#include "model/sgd_trainer.h"
-#include "model/net.h"
-#include "proto/model.pb.h"
-#include "utils/proto_helper.h"
-
-DEFINE_int32(v, 1, "vlog");
-
-int main(int argc, char** argv) {
-  FLAGS_logtostderr=1;
-  google::InitGoogleLogging(argv[0]);
-  gflags::ParseCommandLineFlags(&argc, &argv, true);
-  lapis::ModelProto model_proto;
-  lapis::ReadProtoFromTextFile("examples/imagenet12/model.conf", &model_proto);
-  lapis::SGDTrainer trainer;
-  trainer.Init(model_proto.trainer());
-  lapis::Net net;
-  net.Init(model_proto.net());
-  trainer.Run(&net);
-}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_neuralnet.cc 
b/src/test/dist_test/test_neuralnet.cc
deleted file mode 100644
index a857124..0000000
--- a/src/test/dist_test/test_neuralnet.cc
+++ /dev/null
@@ -1,141 +0,0 @@
-#include <gtest/gtest.h>
-#include <model/neuralnet.h>
-#include "proto/model.pb.h"
-#include "utils/common.h"
-#include "utils/param_updater.h"
-
-using namespace singa;
-NetProto CreateMLPProto(){
-  ModelProto model;
-  ReadProtoFromTextFile("examples/mnist/mlp.conf", &model);
-  return model.neuralnet();
-}
-TEST(NeuralnetTest, BP){
-  ModelProto model;
-  ReadProtoFromTextFile("examples/mnist/mlp.conf", &model);
-
-  AdaGradUpdater updater;
-  updater.Init(model.solver().updater());
-
-  NeuralNet net(model.neuralnet());
-  auto layers=net.layers();
-  for(int i=0;i<3;i++){
-    bool firstlayer=true;
-    for(auto& layer: layers){
-      layer->ComputeFeature();
-      if(firstlayer){
-        DataLayer* dl=static_cast<DataLayer*>(layer.get());
-        dl->CompletePrefetch();
-        firstlayer=false;
-      }
-    }
-
-    for(int k=layers.size()-1;k>=0;k--){
-      layers[k]->ComputeGradient();
-      for(Param* param: layers[k]->GetParams())
-        updater.Update(i, param);
-    }
-  }
-}
-NetProto CreateConvNetProto(){
-  NetProto proto;
-  LayerProto *layer;
-
-  layer=proto.add_layer();
-  layer->set_name("data");
-  layer->set_type("kShardData");
-  DataProto *data=layer->mutable_data_param();
-  data->set_batchsize(8);
-  data->set_path("/data1/wangwei/singa/data/mnist/train/");
-
-  // 4x3x10x10
-  layer=proto.add_layer();
-  layer->set_name("mnist");
-  layer->set_type("kMnistImage");
-  layer->add_srclayers("data");
-
-  // 4x1
-  layer=proto.add_layer();
-  layer->set_name("label");
-  layer->set_type("kLabel");
-  layer->add_srclayers("data");
-
-  // 4x8x9x9
-  layer=proto.add_layer();
-  layer->set_name("conv1");
-  layer->set_type("kConvolution");
-  layer->add_srclayers("mnist");
-  layer->add_param();
-  layer->add_param();
-  ConvolutionProto *conv=layer->mutable_convolution_param();
-  conv->set_num_filters(8);
-  conv->set_kernel(2);
-
-  // 4x8x9x9
-  layer=proto.add_layer();
-  layer->set_name("relu1");
-  layer->set_type("kReLU");
-  layer->add_srclayers("conv1");
-
-  // 4x8x4x4
-  layer=proto.add_layer();
-  layer->set_name("pool1");
-  layer->set_type("kPooling");
-  layer->add_srclayers("relu1");
-  PoolingProto *pool=layer->mutable_pooling_param();
-  pool->set_kernel(4);
-  pool->set_stride(2);
-
-  // 4x10
-  layer=proto.add_layer();
-  layer->set_name("fc1");
-  layer->set_type("kInnerProduct");
-  layer->add_srclayers("pool1");
-  layer->add_param();
-  layer->add_param();
-  InnerProductProto *inner=layer->mutable_inner_product_param();
-  inner->set_num_output(10);
-
-  // 4x10
-  layer=proto.add_layer();
-  layer->set_name("loss");
-  layer->set_type("kSoftmaxLoss");
-  layer->add_srclayers("fc1");
-  layer->add_srclayers("label");
-
-  return proto;
-}
-
-TEST(NeuralNetTest, NoPartition){
-  NetProto proto=CreateConvNetProto();
-  NeuralNet net(proto);
-  const auto& layers=net.layers();
-  ASSERT_EQ(8, layers.size());
-  ASSERT_EQ("data", layers.at(0)->name());
-  ASSERT_EQ("loss", layers.at(7)->name());
-}
-
-TEST(NeuralNetTest, DataPartition){
-  NetProto proto=CreateConvNetProto();
-  proto.set_partition_type(kDataPartition);
-  NeuralNet net(proto, 3);
-  const auto& layers=net.layers();
-  ASSERT_EQ(28, layers.size());
-  ASSERT_EQ("data", layers.at(0)->name());
-}
-TEST(NeuralNetTest, LayerPartition){
-  NetProto proto=CreateConvNetProto();
-  proto.set_partition_type(kLayerPartition);
-  NeuralNet net(proto, 2);
- // const auto& layers=net.layers();
-}
-TEST(NeuralNetTest, HyridPartition){
-  NetProto proto=CreateConvNetProto();
-  int num_layers=proto.layer_size();
-  proto.mutable_layer(num_layers-2)->set_partition_type(kDataPartition);
-  proto.mutable_layer(num_layers-1)->set_partition_type(kDataPartition);
-  proto.set_partition_type(kLayerPartition);
-  NeuralNet net(proto, 2);
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_pm.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_pm.cc b/src/test/dist_test/test_pm.cc
deleted file mode 100644
index 67c210a..0000000
--- a/src/test/dist_test/test_pm.cc
+++ /dev/null
@@ -1,88 +0,0 @@
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-#include <iostream>
-#include <fstream>
-
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-#include "utils/cluster.h"
-#include "utils/common.h"
-#include "proto/model.pb.h"
-#include "proto/cluster.pb.h"
-#include "server/server.h"
-#include "server/pm_server.h"
-#include "worker/pm_client.h"
-#include "worker/worker.h"
-#include "proto/topology.pb.h"
-#include <string.h>
-#include <google/protobuf/text_format.h>
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-
-using namespace google::protobuf::io;
-using google::protobuf::TextFormat;
-
-using std::ifstream;
-
-/**
- * Testing put/get/update performance of the new zeromq-based parameter
- * servers.
- */
-DEFINE_int32(procsID, 0, "global process ID");
-DEFINE_string(hostfile, "examples/imagenet12/hostfile", "hostfile");
-DEFINE_string(cluster_conf, "examples/imagenet12/cluster.conf",
-    "configuration file for the cluster");
-DEFINE_string(model_conf, "examples/imagenet12/model.conf",
-    "Deep learning model configuration file");
-
-DEFINE_string(topology_config,"examples/imagenet12/topology.conf", "Network of 
servers");
-DEFINE_int32(server_threads,1,"Number of server's worker threads per process");
-DEFINE_int32(client_threads,1,"Number of client's worker threads per process");
-
-DEFINE_string(mode, "client", "client or server mode");
-DEFINE_int32(node_id, 0, "ID of the node, client or server");
-DEFINE_int32(primary_set, 0, "ID of the primary server set (for client mode 
only)");
-
-/**
- *
- * Read the topology file in, and start the Client or server respectively.
- *
- * test_pm --node_id <id>
- */
-
-
-#ifndef FLAGS_v
-  DEFINE_int32(v, 3, "vlog controller");
-#endif
-
-int main(int argc, char **argv) {
-       google::InitGoogleLogging(argv[0]);
-       gflags::ParseCommandLineFlags(&argc, &argv, true);
-       FLAGS_logtostderr = 1;
-
-
-       //Read in the topology file
-       int fd = open(FLAGS_topology_config.c_str(), O_RDONLY);
-       assert(fd != -1);
-       singa::Topology topology;
-       TextFormat::Parse(new FileInputStream(fd), &topology);
-
-
-       //read host file
-       ifstream hostfile(FLAGS_hostfile.c_str());
-       string host;
-       vector<string> hosts;
-       while (getline(hostfile, host))
-               hosts.push_back(host);
-       
-       if (FLAGS_node_id < topology.nservers()) {
-               singa::SingaServer *server = new 
singa::SingaServer(FLAGS_node_id, topology, hosts);
-               server->StartServer();
-       } else {
-               singa::SingaClient *client = new 
singa::SingaClient(FLAGS_node_id, topology, hosts);
-               client->StartClient();
-       }
-       
-       return 0;
-}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_router.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_router.cc 
b/src/test/dist_test/test_router.cc
deleted file mode 100644
index bed3d99..0000000
--- a/src/test/dist_test/test_router.cc
+++ /dev/null
@@ -1,27 +0,0 @@
-#include <gflags/gflags.h>
-#include <gtest/gtest.h>
-#include "utils/router.h"
-#include "utils/common.h"
-#include "utils/cluster.h"
-DEFINE_string(hostfile, "examples/imagenet12/hostfile", "hostfile");
-DEFINE_string(cluster_conf, "examples/imagenet12/cluster.conf",
-    "configuration file for the cluster");
-DEFINE_int32(procsID, 0, "global process ID");
-
-int main(int argc, char** argv){
-  google::InitGoogleLogging(argv[0]);
-  gflags::ParseCommandLineFlags(&argc, &argv, true);
-
-  // Init Cluster
-  singa::ClusterProto pcluster;
-  singa::ReadProtoFromTextFile(FLAGS_cluster_conf.c_str(), &pcluster);
-  auto cluster=singa::Cluster::Get(pcluster, FLAGS_hostfile, FLAGS_procsID);
-  if(cluster->AmIServer()){
-    singa::Router server(5732);
-    CHECK(server.Bind(cluster->server_addr(0), cluster->nworkers()));
-  }else{
-    singa::Router worker(5732);
-    CHECK(worker.Connect(cluster->server_addr(0)));
-  }
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_split.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_split.cc b/src/test/dist_test/test_split.cc
deleted file mode 100644
index 674d546..0000000
--- a/src/test/dist_test/test_split.cc
+++ /dev/null
@@ -1,304 +0,0 @@
-//  Copyright © 2014 Anh Dinh. All Rights Reserved.
-
-
-//  Testing the unbalance in spliting parameter vectors.
-
-#include "core/global-table.h"
-#include "core/common.h"
-#include "core/disk-table.h"
-#include "core/table.h"
-#include "core/table_server.h"
-#include "utils/global_context.h"
-#include <gflags/gflags.h>
-#include "proto/model.pb.h"
-#include "worker.h"
-#include "coordinator.h"
-//#include "model_controller/myacc.h"
-#include "utils/common.h"
-
-#include <cmath>
-#include <stdlib.h>
-#include <vector>
-#include <iostream>
-#include <fstream>
-
-using namespace lapis;
-using std::vector;
-
-//DEFINE_bool(sync_update, false, "Synchronous put/update queue");
-DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
-DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
-DEFINE_int64(threshold,1000000, "max # of parameters in a vector");
-DEFINE_int32(iterations,5,"numer of get/put iterations");
-DEFINE_int32(workers,2,"numer of workers doing get/put");
-#ifndef FLAGS_v
-  DEFINE_int32(v, 3, "vlog controller");
-#endif
-
-typedef map<int, GlobalTable*> Map;
-Map tables;
-shared_ptr<NetworkThread> network;
-shared_ptr<GlobalContext> context;
-std::vector<ServerState*> server_states;
-TableServer *table_server;
-
-FloatVector large_msg, small_msg;
-const int SIZE=16;
-
-long sizes[] = { 37448736, 16777216, 4096000, 1327104, 884736, 884736, 614400,
-               14112, 4096, 4096, 1000, 384, 384, 256, 256, 96 };
-
-vector<FloatVector*> value_msg;
-
-int num_keys;
-
-// create large and small messages
-void init_messages(){
-       num_keys = 0;
-  long nservers=context->num_table_servers();
-       for (int i=0; i<SIZE; i++){
-               int total=0;
-    int threshold=std::max(FLAGS_threshold,0l);//, sizes[i]/nservers);
-    VLOG(3)<<"worker: "<<threshold;
-               while (total<sizes[i]){
-                       FloatVector* fv = new FloatVector();
-                       for (int j=0; j+total<sizes[i] && j<threshold; j++)
-                               
fv->add_data(static_cast<float>(rand())/static_cast<float>(RAND_MAX));
-                       value_msg.push_back(fv);
-                       total+=threshold;
-                       num_keys++;
-               }
-       }
-}
-
-void create_mem_table(int id, int num_shards){
-
-       TableDescriptor *info = new TableDescriptor(id, num_shards);
-         info->key_marshal = new Marshal<int>();
-         info->value_marshal = new Marshal<FloatVector>();
-         info->sharder = new Sharding::Mod;
-         info->accum = new MyAcc();
-         info->partition_factory = new typename SparseTable<int, 
FloatVector>::Factory;
-         auto table=new TypedGlobalTable<int, FloatVector>();
-         table->Init(info);
-         tables[id] = table;
-}
-
-void coordinator_assign_tables(int id){
-       for (int i = 0; i < context->num_processes()-1; ++i) {
-           RegisterWorkerRequest req;
-           int src = 0;
-           network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src);
-           //  adding memory server.
-           if (context->IsTableServer(i)) {
-             server_states.push_back(new ServerState(i));
-           }
-         }
-         LOG(INFO) << " All servers registered and started up. Ready to go";
-         //  set itself as the current worker for the table
-         tables[id]->worker_id_ = network->id();
-
-         // memory servers are specified in global context. Round-robin 
assignment
-
-           VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< 
id;
-
-           int server_idx = 0;
-           for (int shard = 0; shard < tables[id]->num_shards(); ++shard) {
-             ServerState &server = *server_states[server_idx];
-             LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server "
-                       <<server_states[server_idx]->server_id;
-
-             // TODO(Anh) may overwrite this field if #shards>#table_servers
-             server.shard_id = shard;
-             server.local_shards.insert(new TaskId(id, shard));
-             server_idx = (server_idx + 1) % server_states.size();
-           }
-
-         VLOG(3)<<"table assignment";
-         //  then send table assignment
-         ShardAssignmentRequest req;
-         for (size_t i = 0; i < server_states.size(); ++i) {
-           ServerState &server = *server_states[i];
-           for (auto * task: server.local_shards) {
-             ShardAssignment *s  = req.add_assign();
-             s->set_new_worker(server.server_id);
-             s->set_table(task->table);
-             s->set_shard(task->shard);
-             //  update local tables
-             CHECK(tables.find(task->table)!=tables.end());
-             GlobalTable *t = tables.at(task->table);
-             t->get_partition_info(task->shard)->owner = server.server_id;
-             delete task;
-           }
-         }
-         VLOG(3)<<"finish table assignment, req size "<<req.assign_size();
-         network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, 
MTYPE_SHARD_ASSIGNMENT_DONE, req);
-         VLOG(3)<<"finish table server init";
-}
-
-void worker_table_init(){
-       table_server = new TableServer();
-       table_server->StartTableServer(tables);
-       VLOG(3) << "done starting table server";
-}
-
-double random_double(){
-       return static_cast<double>(rand())/static_cast<double>(RAND_MAX);
-}
-
-// popular table with random large or small messages.
-// the message distribution specified in FLAGS_large_precentage
-void coordinator_load_data(){
-       auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]);
-
-       num_keys = 0;
-  int nservers=context->num_table_servers();
-       for (int i = 0; i < SIZE; i++) {
-               int total = 0;
-    int threshold=std::max(FLAGS_threshold,0l);//  sizes[i]/nservers);
-    while (total < sizes[i]) {
-      FloatVector* fv = new FloatVector();
-      for (int j = 0; j + total < sizes[i] && j < threshold; j++)
-        fv->add_data(
-            static_cast<float>(rand())
-            / static_cast<float>(RAND_MAX));
-      table->put(num_keys,*fv);
-      total += threshold;
-      num_keys++;
-    }
-       }
-       VLOG(3) << "Loaded data successfully ... " << num_keys << " messages";
-}
-
-void get(TypedGlobalTable<int,FloatVector>* table, ofstream &latency){
-       double start , end;
-  StateQueue<int> state(num_keys);
-  FloatVector v;
-  /*
-       for (int i=0; i<num_keys; i++){
-    start = Now();
-    table->get(i);
-    end=Now();
-    latency << "get: " << (end - start) << endl;
-  }
-  */
-  start=Now();
-       for (int i=0; i<num_keys; i++){
-    if(table->async_get(i, &v))
-      state.Invalid(i);
-       }
-  latency << "send get: " << (Now() - start) << endl;
-  start=Now();
-  while(state.HasValid()){
-    int key=state.Next();
-    if(table->async_get_collect(&key, &v))
-      state.Invalid(key);
-    sleep(0.001);
-  }
-  latency << "collect get: " << (Now() - start) << endl;
-}
-
-void update(TypedGlobalTable<int,FloatVector>* table, ofstream &latency){
-       double start, end;
-       for (int i=0; i<num_keys; i++){
-               start = Now();
-               table->update(i,*value_msg[i]);
-    end=Now();
-               latency << "update: " << (end - start) << endl;
-       }
-}
-
-void worker_test_data(){
-       init_messages();
-       auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]);
-
-       ofstream latency(StringPrintf("latency_%d",NetworkThread::Get()->id()));
-       ofstream throughput(StringPrintf("throughput_%d", 
NetworkThread::Get()->id()));
-       double start, end;
-       for (int i=0; i<FLAGS_iterations; i++){
-               start = Now();
-               get(table, latency);
-    end=Now();
-               throughput << "get: " << (end - start) << " over " << num_keys 
<< " ops " << endl;
-               start = Now();
-               update(table, latency);
-    end=Now();
-               throughput << "update: " << (end - start) << " over " << 
num_keys << " ops " << endl;
-    sleep(10);
-       }
-       latency.close();
-       throughput.close();
-
-}
-
-void print_table_stats(){
-       auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]);
-       ofstream log_file(StringPrintf("log_variance_%d", 
NetworkThread::Get()->id()));
-       log_file << "table size at process "<< NetworkThread::Get()->id()<<" = 
" << table->stats()["TABLE_SIZE"] << endl;
-       log_file.close();
-}
-
-void shutdown(){
-       if (context->AmICoordinator()){
-               VLOG(3) << "Coordinator is shutting down ...";
-               EmptyMessage msg;
-               for (int i=0; i<context->num_processes()-1; i++)
-                       network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg);
-                EmptyMessage shutdown_msg;
-                 for (int i = 0; i < network->size() - 1; i++) {
-                   network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg);
-                 }
-                 network->Flush();
-                 network->Shutdown();
-       }
-       else{
-               VLOG(3) << "Worker " << network->id() << " is shutting down 
...";
-         network->Flush();
-         VLOG(3) << "Done flushing the network thread";
-         network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END, 
EmptyMessage());
-         EmptyMessage msg;
-         network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, 
&msg);
-         VLOG(3) << "Worker received MTYPE_WORKER_SHUTDOWN";
-
-         table_server->ShutdownTableServer();
-         VLOG(3) << "Flushing node " << network->id();
-         network->Shutdown();
-       }
-}
-
-
-int main(int argc, char **argv) {
-       FLAGS_logtostderr = 1;
-       google::InitGoogleLogging(argv[0]);
-       gflags::ParseCommandLineFlags(&argc, &argv, true);
-
-       context = GlobalContext::Get(FLAGS_system_conf, FLAGS_model_conf);
-       network = NetworkThread::Get();
-       VLOG(3) << "*** testing memory servers, with "
-                       << context->num_table_servers() << " servers";
-
-
-       create_mem_table(0,context->num_table_servers());
-
-  LOG(INFO)<<"threshold: "<<FLAGS_threshold<<" nworkers: "<<FLAGS_workers;
-       if (context->AmICoordinator()){
-               coordinator_assign_tables(0);
-               coordinator_load_data();
-               network->barrier();
-       }
-       else{
-               worker_table_init();
-               network->barrier();
-               VLOG(3) << "passed the barrier";
-               print_table_stats();
-
-               //Sleep(1);
-    if(network->id()<FLAGS_workers)
-      worker_test_data();
-       }
-
-       shutdown();
-       return 0;
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_table_server.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_table_server.cc 
b/src/test/dist_test/test_table_server.cc
deleted file mode 100644
index 5f3612c..0000000
--- a/src/test/dist_test/test_table_server.cc
+++ /dev/null
@@ -1,357 +0,0 @@
-//  Copyright © 2014 Anh Dinh. All Rights Reserved.
-
-#include "core/global-table.h"
-#include "core/common.h"
-#include "core/table.h"
-#include "core/table_server.h"
-#include "utils/global_context.h"
-#include "utils/common.h"
-#include <gflags/gflags.h>
-#include "proto/model.pb.h"
-#include "proto/common.pb.h"
-#include "worker.h"
-#include "coordinator.h"
-#include "utils/common.h"
-#include "utils/proto_helper.h"
-
-#include <cmath>
-#include <stdlib.h>
-#include <vector>
-#include <iostream>
-#include <fstream>
-
-
-/**
- * Test for table server access. The table is of type <VKey,int>
- */
-DEFINE_bool(restore_mode, false, "restore from checkpoint file");
-using namespace lapis;
-using std::vector;
-
-DEFINE_int32(checkpoint_frequency, 5000, "frequency for cp");
-DEFINE_int32(checkpoint_after, 1, "cp after this steps");
-DEFINE_string(par_mode, "hybrid",  "time training algorithm");
-DEFINE_bool(restore, false, "restore from checkpoint file");
-
-DEFINE_string(db_backend, "lmdb", "backend db");
-DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
-DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
-DEFINE_string(checkpoint_dir,"/data1/wangwei/lapis/","check point dir");
-DEFINE_int32(threshold,1000000, "max # of parameters in a vector");
-DEFINE_int32(iterations,5,"numer of get/put iterations");
-DEFINE_int32(workers,2,"numer of workers doing get/put");
-DECLARE_bool(checkpoint_enabled);
-
-
-DECLARE_bool(checkpoint_enabled);
-
-/**
- * Get and update handler for VKey.
- */
-struct AnhUpdateHandler: BaseUpdateHandler<VKey, SGDValue> {
-       bool Update(SGDValue *a, const SGDValue &b) {
-
-               float * adptr = 
a->mutable_data()->mutable_value()->mutable_data();
-               const float*bdptr = b.grad(0).value().data();
-               for (int i = 0; i < b.grad(0).value_size(); i++)
-                       adptr[i] += bdptr[i];
-
-               return true;
-       }
-
-       bool Get(const VKey k, const SGDValue &val, SGDValue *ret) {
-               *ret = val;
-               return true;
-       }
-
-       bool is_checkpointable(const VKey k, const SGDValue v) {
-               return false; //always checkpoint
-       }
-};
-
-typedef map<int, GlobalTable*> Map;
-Map tables;
-shared_ptr<NetworkThread> network;
-shared_ptr<GlobalContext> context;
-std::vector<ServerState*> server_states;
-TableServer *table_server;
-
-#define SIZE 16
-int tuple_sizes[SIZE] = {27448736, 16777216, 4096000, 1327104, 884736, 884736, 
614400,14112,4096,4096,1000,384,384,256,256,96};
-
-/**
- * Initialize tables.
- */
-void create_mem_table(int id, int num_shards){
-
-       TableDescriptor *info = new TableDescriptor(id, num_shards);
-         info->key_marshal = new Marshal<VKey>();
-         info->value_marshal = new Marshal<SGDValue>();
-         info->sharder = new VKeySharder;
-         info->accum = new AnhUpdateHandler;
-         info->partition_factory = new typename SparseTable<VKey, 
SGDValue>::Factory;
-         auto table=new TypedGlobalTable<VKey, SGDValue>();
-         table->Init(info);
-         tables[id] = table;
-}
-
-/**
- * Coordinator assigns shards to processes.
- * @param id table ID.
- */
-void coordinator_assign_tables(int id) {
-
-       // wait for the servers to be up.
-       for (int i = 0; i < context->num_procs(); i++) {
-               RegisterWorkerRequest req;
-               int src = 0;
-               //  adding memory server.
-               if (context->IsTableServer(i)) {
-                       VLOG(3)<< "Waiting for message from table server " << i;
-                       network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, 
&req, &src);
-                       server_states.push_back(new ServerState(i));
-               }
-       }
-
-       VLOG(3) << " All servers registered and started up. Ready to go";
-       VLOG(3) << "num of shards" << tables[id]->num_shards() << " for table " 
<< id;
-
-       // assign table to shard in round roubin fashion.
-       int server_idx = 0;
-       for (int shard = 0; shard < tables[id]->num_shards(); ++shard) {
-               ServerState &server = *server_states[server_idx];
-               VLOG(3) << "Assigning table (" << id << "," << shard << ") to 
server "
-                               << server_states[server_idx]->server_id;
-               server.shard_id = shard;
-               server.local_shards.insert(new TaskId(id, shard));
-               server_idx = (server_idx + 1) % server_states.size();
-       }
-       ShardAssignmentRequest req;
-       for (size_t i = 0; i < server_states.size(); ++i) {
-               ServerState &server = *server_states[i];
-               for (auto * task : server.local_shards) {
-                       ShardAssignment *s = req.add_assign();
-                       s->set_new_worker(server.server_id);
-                       s->set_table(task->table);
-                       s->set_shard(task->shard);
-                       //  update local tables
-                       GlobalTable *t = tables.at(task->table);
-                       t->get_partition_info(task->shard)->owner = 
server.server_id;
-                       delete task;
-               }
-       }
-
-       network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, 
MTYPE_SHARD_ASSIGNMENT_DONE,
-                       req);
-       VLOG(3) << "done table assignment... ";
-}
-
-
-void table_init(){
-       table_server = new TableServer();
-       table_server->StartTableServer(tables);
-       VLOG(3) << "table server started on process "<< 
NetworkThread::Get()->id();
-}
-
-
-/**
- * Coordinator loads data to the table.
- * @param size number of tuples.
- */
-void coordinator_load_data() {
-       auto table = static_cast<TypedGlobalTable<VKey, SGDValue>*>(tables[0]);
-       for (int i = 0; i < SIZE; i++) {
-               VKey key;
-               SGDValue x;
-               DAryProto *data = x.mutable_data();
-               DAryProto *grad = x.add_grad();
-               for (int j = 0; j < tuple_sizes[i]; j++) {
-                       data->add_value(j * 1.0f);
-                       grad->add_value(j * 1.0f);
-               }
-               key.set_key(i);
-               table->put(key, x);
-       }
-       VLOG(3) << "Done loading " << SIZE << " tuples ...";
-}
-
-/**
- * Worker gets tuples from the server.
- * @param size number of tuples to be requested.
- */
-void get() {
-       auto table = static_cast<TypedGlobalTable<VKey,SGDValue>*>(tables[0]);
-       SGDValue value;
-       for (int i = 0; i < SIZE; i++) {
-               VKey key;
-               key.set_key(i);
-               table->async_get(key, &value);
-       }
-       VLOG(3) << "Done sending get requests ...";
-
-       for (int i = 0; i < SIZE; i++) {
-               VKey key;
-               while (!table->async_get_collect(&key, &value))
-                       Sleep(0.0001);
-       }
-}
-
-/**
- * Worker updates tuples.
- */
-void update() {
-       auto table = static_cast<TypedGlobalTable<VKey, SGDValue>*>(tables[0]);
-       for (int i = 0; i < SIZE; i++) {
-               VKey key;
-               key.set_key(i);
-
-               SGDValue x;
-               DAryProto *grad = x.add_grad();
-               for (int j = 0; j < tuple_sizes[i]; j++)
-                       grad->add_value(j * 1.0f);
-
-               table->update(key, x);
-       }
-       VLOG(3) << "Done updating " << SIZE << " tuples ...";
-}
-
-
-void worker_test_data() {
-       //get(size);
-       update();
-       update();
-       get();
-       /*
-       update(table, tuples);
-       update(table, tuples);
-       update(table, tuples);
-       get(table, tuples);
-       */
-}
-
-/**
- * Shutdown the process.
- */
-void shutdown() {
-       if (context->AmICoordinator()) {
-               EmptyMessage msg;
-               for (int i = 0; i < context->num_procs() - 1; i++)
-                       network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg);
-               EmptyMessage shutdown_msg;
-               for (int i = 0; i < network->size() - 1; i++) {
-                       network->Send(i, MTYPE_SHUTDOWN, shutdown_msg);
-               }
-               //network->Flush();
-               network->Shutdown();
-       } else {
-               //network->Flush();
-               network->Send(context->num_procs() - 1, MTYPE_WORKER_END,
-                               EmptyMessage());
-               EmptyMessage msg;
-               network->Read(context->num_procs() - 1, MTYPE_SHUTDOWN, &msg);
-
-               if (context->AmITableServer()){
-                       RequestDispatcher::Get()->PrintStats();
-                       table_server->ShutdownTableServer();
-               }
-
-               network->Shutdown();
-       }
-}
-
-/**
- * Worker handle shard assignment from the coordinator.
- */
-void HandleShardAssignment() {
-
-       ShardAssignmentRequest shard_req;
-       auto mpi = NetworkThread::Get();
-       mpi->Read(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT, 
&shard_req);
-
-       //  request read from coordinator
-       for (int i = 0; i < shard_req.assign_size(); i++) {
-               const ShardAssignment &a = shard_req.assign(i);
-               GlobalTable *t = tables.at(a.table());
-               t->get_partition_info(a.shard())->owner = a.new_worker();
-
-               //if local shard, create check-point files
-               if (FLAGS_checkpoint_enabled && t->is_local_shard(a.shard())) {
-                       string checkpoint_file = 
StringPrintf("%s/checkpoint_%d",
-                                       FLAGS_checkpoint_dir.c_str(), 
a.shard());
-                       char hostname[256];
-                       gethostname(hostname, sizeof(hostname));
-
-                       FILE *tmp_file = fopen(checkpoint_file.c_str(), "r");
-                       if (tmp_file) { //exists -> open to reading and writing
-                               fclose(tmp_file);
-                               auto cp = t->checkpoint_files();
-
-                               if (FLAGS_restore_mode) { //open in read mode 
to restore, then close
-                                       LogFile *file = new 
LogFile(checkpoint_file, "rw", 0);
-                                       int table_size = 
file->read_latest_table_size();
-                                       delete file;
-
-                                       double start = Now();
-                                       (*cp)[a.shard()] = new 
LogFile(checkpoint_file, "r",
-                                                       a.shard());
-                                       t->Restore(a.shard());
-                                       delete (*cp)[a.shard()];
-                                       double end = Now();
-                                       LOG(ERROR) << "restore time\t" << end - 
start << "\tfor\t"
-                                                       << table_size << 
"\tthreshold\t" << FLAGS_threshold;
-                               }
-                               char hostname[256];
-                               gethostname(hostname, sizeof(hostname));
-                               (*cp)[a.shard()] = new LogFile(checkpoint_file, 
"a", a.shard());
-                       } else { // not exist -> open to writing first time
-                               auto cp = t->checkpoint_files();
-                               (*cp)[a.shard()] = new LogFile(checkpoint_file, 
"w", a.shard());
-                       }
-               }
-       }
-
-       EmptyMessage empty;
-       mpi->Send(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT_DONE, 
empty);
-       VLOG(3) << "Done handling shard assignment ...";
-
-}
-
-
-int main(int argc, char **argv) {
-       FLAGS_logtostderr = 1;
-       int provided;
-       MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
-       google::InitGoogleLogging(argv[0]);
-       gflags::ParseCommandLineFlags(&argc, &argv, true);
-
-       context = GlobalContext::Get(FLAGS_system_conf);
-       network = NetworkThread::Get();
-
-       ModelProto model;
-       ReadProtoFromTextFile(FLAGS_model_conf.c_str(), &model);
-
-       create_mem_table(0, context->num_table_servers());
-
-       if (context->AmICoordinator()) {
-               coordinator_assign_tables(0);
-               coordinator_load_data();
-               network->barrier();
-       } else {
-               if (context->AmITableServer()) {
-                       table_init();
-                       HandleShardAssignment();
-                       network->barrier();
-               } else {
-                       HandleShardAssignment();
-                       network->barrier();
-                       Sleep(1);
-                       VLOG(3) << "Worker cleared the barrier ...";
-                       worker_test_data();
-               }
-       }
-
-       shutdown();
-       return 0;
-}
-
-


Reply via email to