http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/proto/model.proto
----------------------------------------------------------------------
diff --git a/src/proto/model.proto b/src/proto/model.proto
new file mode 100644
index 0000000..4ea621d
--- /dev/null
+++ b/src/proto/model.proto
@@ -0,0 +1,382 @@
+package singa;
+enum MsgType{
+  kGet=0;
+  kPut=1;
+  kSync=2;
+  kUpdate=3;
+  kSyncRequest=4;
+  kSyncResponse=5;
+  kStop=6;
+  kData=7;
+  kRGet=8;
+  kRUpdate=9;
+  kConnect=10;
+};
+
+enum EntityType{
+  kWorkerParam=0;
+  kWorkerLayer=1;
+  kServer=2;
+  kStub=3;
+};
+enum Phase {
+  kTrain = 0;
+  kValidation=1;
+  kTest= 2;
+}
+enum ShareOption{
+  kValueOnly=0;
+  kWhole=1;
+};
+message ModelProto{
+  optional string name = 1;
+  // relative path to system folder
+  optional string train_folder=2 [default="train"];
+  optional string test_folder=3 [default="test"];
+  optional string validation_folder=4 [default="validation"];
+  // start display after this num steps
+  optional int32 display_after_steps = 6 [default = 0];
+  // frequency of display
+  optional int32 display_frequency = 7 [default = 0];
+
+  // the time of validation
+  //optional int32 validation_step = 9 [default = 0];
+  // start validation after this num steps
+  optional int32 validation_after_steps = 10 [default = 0];
+  // frequency of validation
+  optional int32 validation_frequency = 11 [default = 0];
+
+  // the time of test
+  //optional int32 test_step = 12 [default = 0];
+  // start test after this num steps
+  optional int32 test_after_steps = 13 [default = 0];
+  // frequency of test
+  optional int32 test_frequency = 14 [default = 0];
+  optional int32 checkpoint_after_steps = 15 [default = 0];
+  // frequency of test
+  optional int32 checkpoint_frequency = 16 [default = 0];
+  optional bool prefetch=18[default=true];
+
+
+  // total num of steps for training
+  optional int32 train_steps = 20;
+  // total num of steps for validation
+  optional int32 validation_steps=21;
+  // total num of steps for test
+  optional int32 test_steps=22;
+  // last snapshot step
+  optional int32 step=29 [default=0];
+
+  optional UpdaterProto updater=31;
+  // There are two basic algorithms for calculating gradients.
+  // Different deep learning models use different algorithms.
+  enum GradCalcAlg{
+    kBackPropagation = 1;
+    kContrastiveDivergence = 2;
+  }
+  optional GradCalcAlg alg= 32 [default = kBackPropagation];
+  optional bool hogwild=33 [default=false];
+  optional NetProto neuralnet = 40;
+  optional bool debug=41 [default=false];
+}
+
+message NetProto{
+  repeated LayerProto layer=1;
+  optional PartitionType partition_type=3 [default=kNone];
+}
+
+message ParamProto {
+  // for the program to identify it and share among layers.
+  // e.g., "conv1_weight","fc_bias"
+  optional string name = 1;
+  optional int32 id=2;
+  // in most situations, user do not need to config this,
+  // the program will calculate it
+  repeated int32 shape = 3;
+
+  // split the parameter into multiple DAryProtos for serialzation and
+  // transferring (Google Protobuf has size limit)
+  optional int32 split_threshold=4 [default=5000000];
+  // partition dimension, -1 for no partition
+  optional int32 partition_dim=5 [default =-1];
+
+  optional int32 version=6;
+
+  // value of the parameter
+  //repeated DAryProto ary = 6;
+
+  enum InitMethod {
+    kConstant = 0;
+    // sample gaussian with std and mean
+    kGaussian = 1;
+    // uniform sampling between low and high
+    kUniform = 2;
+    // copy the content and history which are from previous training
+    kPretrained = 3;
+    // from Toronto Convnet, let a=1/sqrt(fan_in), w*=a after generating from
+    // Gaussian distribution
+    kGaussainSqrtFanIn = 4;
+    // from Toronto Convnet, rectified linear activation, let
+    // a=sqrt(3)/sqrt(fan_in), range is [-a, +a]; no need to set value=sqrt(3),
+    // the program will multiply it.
+    kUniformSqrtFanIn = 5;
+    // from Theano MLP tutorial, let a=1/sqrt(fan_in+fan_out). for tanh
+    // activation, range is [-6a, +6a], for sigmoid activation, range is
+    // [-24a, +24a], put the scale factor to value field.
+    // <a href="http://deeplearning.net/tutorial/mlp.html";> Theano MLP</a>
+    kUniformSqrtFanInOut = 6;
+  }
+  optional InitMethod init_method = 7 [default = kConstant];
+  // constant init
+  optional float value = 8 [default = 1];
+  // for uniform sampling
+  optional float low = 9 [default = -1];
+  optional float high = 10 [default = 1];
+  // for gaussian sampling
+  optional float mean = 11 [default = 0];
+  optional float std = 12 [default = 1];
+  // multiplied on the global learning rate.
+  optional float learning_rate_multiplier =13 [default=1];
+  // multiplied on the global weight decay.
+  optional float weight_decay_multiplier =14 [default=1];
+}
+
+message BlobProtos{
+  repeated BlobProto blobs=1;
+  repeated int32 ids=2;
+  repeated string names=3;
+}
+
+
+
+enum PartitionType{
+  kDataPartition=0;
+  kLayerPartition=1;
+  kNone=2;
+}
+enum ConnectionType{
+  kOneToOne=0;
+  kOneToAll=1;
+}
+
+message LayerProto {
+  optional string name = 1; // the layer name
+  optional string type = 2; // the layer type from the enum above
+  repeated string srclayers=3;
+  optional int32 locationid=4 [default=0]; // todo make locationID an array
+  optional int32 partitionid=5 [default=0];
+  optional PartitionType partition_type=6;
+  // can be pos/neg neuron value for CD, neuron value/grad for BP
+  //repeated DAryProto ary = 10;
+  repeated string share_ary =11;
+  // parameters, e.g., weight matrix or bias vector
+  repeated ParamProto param = 12;
+  // names of parameters shared from other layers
+  repeated string share_param=13;
+
+  // All layers are included in the net structure for training phase by 
default.
+  // Layers, e.g., computing performance metrics for test phase, can be 
excluded
+  // by this field which defines in which phase this layer should be excluded.
+  repeated Phase exclude = 20;
+
+  // hyper-parameters for layers
+  optional ConvolutionProto convolution_param = 21;
+  optional ConcateProto concate_param = 31;
+  optional DataProto data_param = 22;
+  optional DropoutProto dropout_param = 23;
+  optional InnerProductProto inner_product_param = 24;
+  optional LRNProto lrn_param = 25;
+  optional MnistProto mnist_param= 26;
+  optional PoolingProto pooling_param = 27;
+  optional SliceProto slice_param = 32;
+  optional SplitProto split_param = 33;
+  optional ReLUProto relu_param = 28;
+  optional RGBImage rgbimage_param=34;
+  optional SoftmaxLossProto softmaxloss_param = 29;
+  optional TanhProto tanh_param=30;
+}
+message RGBImage {
+  optional float scale=1 [default=1.0];
+  optional int32 cropsize=2 [default=0];
+  optional bool mirror=3 [default=false];
+  optional string meanfile=4;
+}
+message SplitProto{
+  optional int32 num_splits=1;
+}
+// scaled tan: A*tan(B*x)
+message TanhProto{
+  optional float outer_scale=1 [default=1.0];
+  optional float inner_scale=2 [default=1.0];
+}
+
+// Message that stores parameters used by SoftmaxLossProto
+message SoftmaxLossProto {
+  // accuracy is not comptued by default, unless topk>0;
+  // When computing accuracy, count as correct by comparing the true label to
+  // the top k scoring classes.
+  optional int32 topk = 1 [default=1] ;
+  optional float scale=2 [default=1];
+}
+// Message that stores parameters used by ConvolutionLayer
+message ConvolutionProto {
+  optional uint32 num_filters = 1; // The number of outputs for the layer
+  optional bool bias_term = 2 [default = true]; // whether to have bias terms
+  // Pad, kernel size, and stride are all given as a single value for equal
+  // dimensions in height and width or as Y, X pairs.
+  optional uint32 pad = 3 [default = 0]; // The padding size (equal in Y, X)
+  optional uint32 stride = 4 [default = 1]; // The stride (equal in Y, X)
+  required uint32 kernel= 5; // The kernel height/width
+}
+
+message ConcateProto{
+  optional int32 concate_dimension=1;
+  optional int32 concate_num=2;
+}
+
+// Message that stores parameters used by DataLayer
+message DataProto {
+  // Specify the data source.
+  optional string source = 1;
+  // path to the data file/folder, absolute or relative to the
+  // ClusterProto::workspace
+  optional string path=2;
+  // Specify the batch size.
+  optional uint32 batchsize = 4;
+  // skip [0,random_skip] records
+  optional uint32 random_skip=5 [default=0];
+}
+
+message MnistProto {
+  // elastic distortion
+  optional int32 kernel=1 [default=0];
+  optional float sigma=2 [default=0];
+  optional float alpha=3 [default=0];
+  // rotation or horizontal shearing
+  optional float beta=4 [default=0];
+  // scaling
+  optional float gamma=5 [default=0];
+  // scale to this size as input for deformation
+  optional int32 resize=6 [default=0] ;
+  optional int32 elastic_freq=7 [default=0];
+  optional float norm_a=8 [default=1];
+  optional float norm_b=9 [default=0];
+}
+// Message that stores parameters used by DropoutLayer
+message DropoutProto {
+  optional float dropout_ratio = 1 [default = 0.5]; // dropout ratio
+}
+// Message that stores parameters used by InnerProductLayer
+message InnerProductProto {
+  optional uint32 num_output = 1; // The number of outputs for the layer
+  optional bool bias_term = 2 [default = true]; // whether to have bias terms
+}
+
+// Message that stores parameters used by LRNLayer
+message LRNProto {
+  optional uint32 local_size = 1 [default = 5];
+  optional float alpha = 2 [default = 1.];
+  optional float beta = 3 [default = 0.75];
+  enum NormRegion {
+    ACROSS_CHANNELS = 0;
+    WITHIN_CHANNEL = 1;
+  }
+  optional NormRegion norm_region = 4 [default = ACROSS_CHANNELS];
+  optional float knorm =5 [default=1.0];
+}
+
+// Message that stores parameters used by PoolingLayer
+message PoolingProto {
+  enum PoolMethod {
+    MAX = 0;
+    AVE = 1;
+  }
+  optional PoolMethod pool = 1 [default = MAX]; // The pooling method
+  // Pad, kernel size, and stride are all given as a single value for equal
+  // dimensions in height and width or as Y, X pairs.
+  required uint32 kernel= 2; // The kernel size (square)
+  optional uint32 pad = 4 [default = 0]; // The padding size (equal in Y, X)
+  optional uint32 stride = 3 [default = 1]; // The stride (equal in Y, X)
+}
+
+message SliceProto{
+  optional int32 slice_dimension=1;
+  optional int32 slice_num=2;
+}
+// Message that stores parameters used by ReLULayer
+message ReLUProto {
+  // Allow non-zero slope for negative inputs to speed up optimization
+  // Described in:
+  // Maas, A. L., Hannun, A. Y., & Ng, A. Y. (2013). Rectifier nonlinearities
+  // improve neural network acoustic models. In ICML Workshop on Deep Learning
+  // for Audio, Speech, and Language Processing.
+  optional float negative_slope = 1 [default = 0];
+}
+
+
+
+message Record {
+  enum Type{
+    kSingleLabelImage=0;
+  }
+  optional Type type=1 [default=kSingleLabelImage];
+  optional SingleLabelImageRecord image=2;
+}
+
+// to import caffe's lmdb dataset
+message Datum {
+  optional int32 channels = 1;
+  optional int32 height = 2;
+  optional int32 width = 3;
+  // the actual image data, in bytes
+  optional bytes data = 4;
+  optional int32 label = 5;
+  // Optionally, the datum could also hold float data.
+  repeated float float_data = 6;
+  // If true data contains an encoded image that need to be decoded
+  optional bool encoded = 7 [default = false];
+}
+message SingleLabelImageRecord{
+  repeated int32 shape=1;
+  optional int32 label=2;
+  optional bytes pixel=3;
+  repeated float data=4;
+}
+
+message UpdaterProto {
+  optional float momentum=4 [default=0];
+  optional float weight_decay=5 [default=0];
+  // used in changing learning rate
+  optional float gamma = 6 [default=1];
+  optional float pow=7 [default=0];
+  optional float delta=8 [default=0.0000001];
+  optional float rho=9 [default=0.9];
+  optional float base_learning_rate=12;
+  optional float final_learning_rate=13;
+  optional int32 learning_rate_change_frequency = 14;
+  enum ChangeProto {
+    kFixed = 0;
+    kInverse_t= 1;
+    kInverse= 2;
+    kExponential = 3;
+    kLinear = 4;
+    kStep = 5;
+    kFixedStep=6;
+  }
+  optional ChangeProto learning_rate_change_method = 16 [default = kFixed];
+  optional int32 sync_frequency=17 [default=1];
+  // warmup the parameters and then send to parameter servers.
+  optional int32 warmup_steps=25 [default=10];
+  optional float moving_rate=26 [default=0];
+  optional string param_type=27[default="Param"];
+  repeated int32 step=28;
+  repeated float step_lr=29;
+}
+message BlobProto {
+  optional int32 num = 1 [default = 0];
+  optional int32 channels = 2 [default = 0];
+  optional int32 height = 3 [default = 0];
+  optional int32 width = 4 [default = 0];
+  repeated float data = 5 [packed = true];
+  repeated float diff = 6 [packed = true];
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_consistency.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_consistency.cc 
b/src/test/dist_test/test_consistency.cc
new file mode 100644
index 0000000..a4ed9b2
--- /dev/null
+++ b/src/test/dist_test/test_consistency.cc
@@ -0,0 +1,406 @@
+//  Copyright © 2014 Anh Dinh. All Rights Reserved.
+
+//  Testing the unbalance in spliting parameter vectors.
+
+#include "core/global-table.h"
+#include "core/common.h"
+#include "core/disk-table.h"
+#include "core/table.h"
+#include "core/table_server.h"
+#include "utils/global_context.h"
+#include <gflags/gflags.h>
+#include "proto/model.pb.h"
+#include "proto/common.pb.h"
+#include "worker.h"
+#include "coordinator.h"
+#include "utils/common.h"
+#include "utils/proto_helper.h"
+
+#include <cmath>
+#include <stdlib.h>
+#include <vector>
+#include <iostream>
+#include <fstream>
+
+
+DEFINE_bool(restore_mode, false, "restore from checkpoint file");
+using namespace lapis;
+using std::vector;
+
+//DEFINE_bool(sync_update, false, "Synchronous put/update queue");
+DEFINE_int32(checkpoint_frequency, 5000, "frequency for cp");
+DEFINE_int32(checkpoint_after, 1, "cp after this steps");
+DEFINE_string(par_mode, "hybrid",  "time training algorithm");
+DEFINE_bool(restore, false, "restore from checkpoint file");
+
+DEFINE_string(db_backend, "lmdb", "backend db");
+DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
+DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
+DEFINE_string(checkpoint_dir,"/data1/wangwei/lapis/","check point dir");
+DEFINE_int32(threshold,1000000, "max # of parameters in a vector");
+DEFINE_int32(iterations,5,"numer of get/put iterations");
+DEFINE_int32(workers,2,"numer of workers doing get/put");
+DECLARE_bool(checkpoint_enabled);
+
+#ifndef FLAGS_v
+  DEFINE_int32(v, 3, "vlog controller");
+#endif
+
+
+struct AnhUpdateHandler: BaseUpdateHandler<VKey,SGDValue>{
+       bool Update(SGDValue *a, const SGDValue &b){
+    float * adptr=a->mutable_data()->mutable_value()->mutable_data();
+    const float*bdptr=b.grad(0).value().data();
+    for(int i=0;i<b.grad(0).value_size();i++)
+      adptr[i]+=bdptr[i];
+               return true;
+       }
+
+  bool Get(const VKey k, const SGDValue &val, SGDValue *ret){
+      *ret = val;
+      return true;
+  }
+
+  bool is_checkpointable(const VKey k, const SGDValue v){
+       return true; //always checkpoint
+  }
+};
+
+typedef map<int, GlobalTable*> Map;
+Map tables;
+shared_ptr<NetworkThread> network;
+shared_ptr<GlobalContext> context;
+std::vector<ServerState*> server_states;
+TableServer *table_server;
+TableDelegate *delegate;
+void create_mem_table(int id, int num_shards){
+
+       TableDescriptor *info = new TableDescriptor(id, num_shards);
+         info->key_marshal = new Marshal<VKey>();
+         info->value_marshal = new Marshal<SGDValue>();
+         info->sharder = new VKeySharder;
+         info->accum = new AnhUpdateHandler;
+         info->partition_factory = new typename SparseTable<VKey, 
SGDValue>::Factory;
+         auto table=new TypedGlobalTable<VKey, SGDValue>();
+         table->Init(info);
+         tables[id] = table;
+}
+
+void coordinator_assign_tables(int id){
+       for (int i = 0; i < context->num_procs()        ; ++i) {
+           RegisterWorkerRequest req;
+           int src = 0;
+           //  adding memory server.
+           if (context->IsTableServer(i)) {
+             network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src);
+             server_states.push_back(new ServerState(i));
+           }
+         }
+         LOG(INFO) << " All servers registered and started up. Ready to go";
+         //  set itself as the current worker for the table
+         tables[id]->worker_id_ = network->id();
+
+         // memory servers are specified in global context. Round-robin 
assignment
+
+           VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< 
id;
+
+           int server_idx = 0;
+           for (int shard = 0; shard < tables[id]->num_shards(); ++shard) {
+             ServerState &server = *server_states[server_idx];
+             LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server "
+                       <<server_states[server_idx]->server_id;
+
+             // TODO(Anh) may overwrite this field if #shards>#table_servers
+             server.shard_id = shard;
+             server.local_shards.insert(new TaskId(id, shard));
+             server_idx = (server_idx + 1) % server_states.size();
+           }
+
+         VLOG(3)<<"table assignment";
+         //  then send table assignment
+         ShardAssignmentRequest req;
+         for (size_t i = 0; i < server_states.size(); ++i) {
+           ServerState &server = *server_states[i];
+           for (auto * task: server.local_shards) {
+             ShardAssignment *s  = req.add_assign();
+             s->set_new_worker(server.server_id);
+             s->set_table(task->table);
+             s->set_shard(task->shard);
+             //  update local tables
+             CHECK(tables.find(task->table)!=tables.end());
+             GlobalTable *t = tables.at(task->table);
+             t->get_partition_info(task->shard)->owner = server.server_id;
+             delete task;
+           }
+         }
+         VLOG(3)<<"finish table assignment, req size "<<req.assign_size();
+         network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, 
MTYPE_SHARD_ASSIGNMENT_DONE, req);
+         VLOG(3)<<"finish table server init";
+}
+
+
+void worker_table_init(){
+       table_server = new TableServer();
+       table_server->StartTableServer(tables);
+       VLOG(3) << "done starting table server";
+}
+
+double random_double(){
+       return static_cast<double>(rand())/static_cast<double>(RAND_MAX);
+}
+
+// popular table with random large or small messages.
+// the message distribution specified in FLAGS_large_precentage
+void coordinator_load_data(const vector<int>& tuples){
+  auto table = static_cast<TypedGlobalTable<VKey,SGDValue>*>(tables[0]);
+
+  int nservers=context->num_table_servers();
+  int keyid=0;
+  if (!FLAGS_restore_mode){
+    for(auto tuple: tuples){
+      for(int offset=0;offset<tuple;){
+        SGDValue x;
+        DAryProto *data=x.mutable_data();
+        DAryProto *grad=x.add_grad();
+        for(int i=0;i <std::min(FLAGS_threshold, tuple-offset);i++){
+          data->add_value(i*1.0f);
+          grad->add_value(i*1.0f);
+        }
+        offset+=data->value_size();
+        VKey key;
+        key.set_key(keyid++);
+        table->put(key,x);
+      }
+    }
+    LOG(ERROR)<<"put "<<keyid<<" tuples";
+  }
+
+  /*
+       LogFile *file = new LogFile("/data1/wangwei/lapis/checkpoint_0","rw",0);
+       VLOG(3) << "Loaded table " << file->file_name();
+       string k,v;
+       int table_size = file->read_latest_table_size();
+       VLOG(3) << "table size = " << table_size;
+       for (int i=0; i<table_size; i++){
+               int tmp;
+               file->previous_entry(&k, &v, &tmp);
+               int *key = reinterpret_cast<int *>((char*)&k[0]);
+               int *val = reinterpret_cast<int *>((char*)&v[0]);
+               VLOG(3) << "k = " << *key << " val = " << *val;
+       }
+       delete file;
+  */
+
+       /*
+       for (int i=0; i<num_keys; i++){
+               table->put(i,0); //loaded again
+       }*/
+       VLOG(3) << "Coordinator done loading ..., from process 
"<<NetworkThread::Get()->id();
+}
+
+void get(TypedGlobalTable<VKey,SGDValue>* table, const vector<int>& tuples){
+  SGDValue v;
+  int num_keys=0;
+  for(auto tuple: tuples){
+    num_keys+=tuple/FLAGS_threshold+(tuple%FLAGS_threshold!=0);
+  }
+  LOG(ERROR)<<"getting "<<num_keys<<" tuples";
+
+  for (int i=0; i<num_keys; i++){
+    VKey key;
+    key.set_key(i);
+    table->async_get(key, &v);
+  }
+
+
+  int key=0;
+  SGDValue val;
+
+  LOG(INFO)<<"start collect key";
+  for (int i=0; i<num_keys; i++){
+    VKey key;
+    while(!table->async_get_collect(&key, &val))
+      Sleep(0.001);
+    //LOG(INFO)<<"collect key "<<key<<" with val "<<val;
+  }
+}
+
+void update(TypedGlobalTable<VKey,SGDValue>* table, const vector<int>& tuples){
+  if(NetworkThread::Get()->id()==0)
+    sleep(2);
+  LOG(INFO)<<"start update";
+  int keyid=0;
+  for(auto tuple: tuples){
+    for(int offset=0;offset<tuple;){
+      SGDValue x;
+      DAryProto *grad=x.add_grad();
+      for(int i=0;i <std::min(FLAGS_threshold, tuple-offset);i++){
+        grad->add_value(i*1.0f);
+      }
+      offset+=grad->value_size();
+      VKey key;
+      key.set_key(keyid++);
+      table->update(key,x);
+    }
+  }
+  LOG(ERROR)<<"updated "<<keyid<<" tuples";
+}
+
+void worker_test_data(const vector<int>& tuples){
+  auto table = static_cast<TypedGlobalTable<VKey,SGDValue>*>(tables[0]);
+
+  get(table, tuples);
+  update(table, tuples);
+  update(table, tuples);
+  update(table, tuples);
+  get(table, tuples);
+}
+
+void shutdown(){
+       if (context->AmICoordinator()){
+               EmptyMessage msg;
+               for (int i=0; i<context->num_procs()-1; i++)
+                       network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg);
+                EmptyMessage shutdown_msg;
+                 for (int i = 0; i < network->size() - 1; i++) {
+                   network->Send(i, MTYPE_SHUTDOWN, shutdown_msg);
+                 }
+                 network->Flush();
+                 network->Shutdown();
+       }
+       else{
+         network->Flush();
+
+         network->Send(context->num_procs()-1, MTYPE_WORKER_END, 
EmptyMessage());
+
+         EmptyMessage msg;
+
+         network->Read(context->num_procs()-1, MTYPE_SHUTDOWN, &msg);
+
+         if (context->AmITableServer())
+                 table_server->ShutdownTableServer();
+
+         network->Shutdown();
+       }
+}
+
+void HandleShardAssignment() {
+
+  ShardAssignmentRequest shard_req;
+  auto mpi=NetworkThread::Get();
+  mpi->Read(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT, &shard_req);
+  //  request read from coordinator
+  for (int i = 0; i < shard_req.assign_size(); i++) {
+    const ShardAssignment &a = shard_req.assign(i);
+    GlobalTable *t = tables.at(a.table());
+    t->get_partition_info(a.shard())->owner = a.new_worker();
+
+
+    //if local shard, create check-point files
+    if (FLAGS_checkpoint_enabled && t->is_local_shard(a.shard())){
+      string checkpoint_file = 
StringPrintf("%s/checkpoint_%d",FLAGS_checkpoint_dir.c_str(), a.shard());
+        char hostname[256];
+        gethostname(hostname, sizeof(hostname));
+        VLOG(3) << "try to open for writing *****"<<checkpoint_file<<" 
"<<string(hostname);
+
+      FILE *tmp_file = fopen(checkpoint_file.c_str(), "r");
+      if (tmp_file){//exists -> open to reading and writing
+        fclose(tmp_file);
+        auto cp = t->checkpoint_files();
+
+        if (FLAGS_restore_mode){//open in read mode to restore, then close
+          LogFile *file = new LogFile(checkpoint_file,"rw",0);
+          VLOG(3) << "Loaded table " << file->file_name();
+          int table_size = file->read_latest_table_size();
+          delete file;
+
+          double start=Now();
+          VLOG(3) << "Open checkpoint file to restore";
+          (*cp)[a.shard()] = new LogFile(checkpoint_file,"r",a.shard());
+          t->Restore(a.shard());
+          delete (*cp)[a.shard()];
+          double end=Now();
+          LOG(ERROR)<<"restore time\t"<<end-start<< "\tfor\t"
+            <<table_size<<"\tthreshold\t"<<FLAGS_threshold;
+        }
+        char hostname[256];
+        gethostname(hostname, sizeof(hostname));
+        VLOG(3) << "open for writing *****"<<checkpoint_file<<" 
"<<string(hostname);
+
+
+
+        VLOG(3) << "Open checkpoint file for writing";
+        (*cp)[a.shard()] = new LogFile(checkpoint_file,"a",a.shard());
+      }
+      else{// not exist -> open to writing first time
+        auto cp = t->checkpoint_files();
+        (*cp)[a.shard()] = new LogFile(checkpoint_file,"w",a.shard());
+        VLOG(3) << "Added to new checkpoint files for shard "<< a.shard();
+      }
+
+    }
+
+
+  }
+  EmptyMessage empty;
+  mpi->Send(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT_DONE, empty);
+  VLOG(3)<<"finish handle shard assignment **";
+
+}
+
+
+int main(int argc, char **argv) {
+       FLAGS_logtostderr = 1;
+       int provided;
+       MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+       google::InitGoogleLogging(argv[0]);
+       gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+       context = GlobalContext::Get(FLAGS_system_conf);
+       network = NetworkThread::Get();
+
+       ModelProto model;
+       ReadProtoFromTextFile(FLAGS_model_conf.c_str(), &model);
+
+       create_mem_table(0,context->num_table_servers());
+
+  vector<int> tuple_size{37448736, 16777216, 4096000, 1327104, 884736, 884736, 
614400,14112,4096,4096,1000,384,384,256,256,96};
+  /*
+  vector<int> tuples;
+  for(int i=0;i<3;i++){
+    for(int j=0;j<FLAGS_workers;j++)
+      tuples.push_back(tuple_size[i]/FLAGS_workers);
+  }
+  for(int i=3;i<tuple_size.size();i++)
+    tuples.push_back(tuple_size[i]);
+    */
+
+       if (context->AmICoordinator()){
+               VLOG(3) << "Coordinator process rank = " << 
NetworkThread::Get()->id();
+               coordinator_assign_tables(0);
+               coordinator_load_data(tuple_size);
+
+               network->barrier();
+       }
+       else{
+               if (context->AmITableServer()){
+                       worker_table_init();
+                       HandleShardAssignment();
+                       network->barrier();
+               }
+               else{
+                       VLOG(3) << "Inside worker, waiting for assignemtn";
+                       HandleShardAssignment();
+                       network->barrier();
+      if(!FLAGS_restore_mode)
+        worker_test_data(tuple_size);
+               }
+       }
+       shutdown();
+
+
+       VLOG(3) << "Done ...";
+       return 0;
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_core.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_core.cc b/src/test/dist_test/test_core.cc
new file mode 100644
index 0000000..35d589b
--- /dev/null
+++ b/src/test/dist_test/test_core.cc
@@ -0,0 +1,192 @@
+//  Copyright © 2014 Anh Dinh. All Rights Reserved.
+
+
+
+#include "core/global-table.h"
+#include "core/common.h"
+#include "core/disk-table.h"
+#include "core/table.h"
+#include "core/table_server.h"
+#include "utils/global_context.h"
+#include <gflags/gflags.h>
+#include "proto/model.pb.h"
+#include "worker.h"
+#include "coordinator.h"
+#include "model_controller/myacc.h"
+#include <cmath>
+
+using namespace lapis;
+
+DEFINE_bool(sync_update, false, "Synchronous put/update queue");
+DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
+DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
+DEFINE_int32(num_keys,10,"");
+
+typedef map<int, GlobalTable*> Map;
+Map tables;
+shared_ptr<NetworkThread> network;
+shared_ptr<GlobalContext> context;
+std::vector<ServerState*> server_states;
+TableServer *table_server;
+
+void create_mem_table(int id, int num_shards){
+
+       TableDescriptor *info = new TableDescriptor(id, num_shards);
+         info->key_marshal = new Marshal<int>();
+         info->value_marshal = new Marshal<int>();
+         info->sharder = new Sharding::Mod;
+         info->accum = new TestUpdater();
+         info->partition_factory = new typename SparseTable<int, int>::Factory;
+         auto table=new TypedGlobalTable<int, int>();
+         table->Init(info);
+         tables[id] = table;
+}
+
+void coordinator_assign_tables(int id){
+       for (int i = 0; i < context->num_processes()-1; ++i) {
+           RegisterWorkerRequest req;
+           int src = 0;
+           network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src);
+           //  adding memory server.
+           if (context->IsTableServer(i)) {
+             server_states.push_back(new ServerState(i));
+           }
+         }
+         LOG(INFO) << " All servers registered and started up. Ready to go";
+         //  set itself as the current worker for the table
+         tables[id]->worker_id_ = network->id();
+
+         // memory servers are specified in global context. Round-robin 
assignment
+
+           VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< 
id;
+
+           int server_idx = 0;
+           for (int shard = 0; shard < tables[id]->num_shards(); ++shard) {
+             ServerState &server = *server_states[server_idx];
+             LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server "
+                       <<server_states[server_idx]->server_id;
+
+             // TODO(Anh) may overwrite this field if #shards>#table_servers
+             server.shard_id = shard;
+             server.local_shards.insert(new TaskId(id, shard));
+             server_idx = (server_idx + 1) % server_states.size();
+           }
+
+         VLOG(3)<<"table assignment";
+         //  then send table assignment
+         ShardAssignmentRequest req;
+         for (size_t i = 0; i < server_states.size(); ++i) {
+           ServerState &server = *server_states[i];
+           for (auto * task: server.local_shards) {
+             ShardAssignment *s  = req.add_assign();
+             s->set_new_worker(server.server_id);
+             s->set_table(task->table);
+             s->set_shard(task->shard);
+             //  update local tables
+             CHECK(tables.find(task->table)!=tables.end());
+             GlobalTable *t = tables.at(task->table);
+             t->get_partition_info(task->shard)->owner = server.server_id;
+             delete task;
+           }
+         }
+         VLOG(3)<<"finish table assignment, req size "<<req.assign_size();
+         network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, 
MTYPE_SHARD_ASSIGNMENT_DONE, req);
+         VLOG(3)<<"finish table server init";
+}
+
+void worker_table_init(){
+       table_server = new TableServer();
+       table_server->StartTableServer(tables);
+       VLOG(3) << "done starting table server";
+}
+
+
+void coordinator_load_data(){
+       auto table = static_cast<TypedGlobalTable<int,int>*>(tables[0]);
+       for (int i = 1; i<=FLAGS_num_keys; i++){
+               table->put(i,i);
+       }
+       VLOG(3) << "Loaded data successfully ...";
+}
+
+void worker_test_data(){
+       auto table = static_cast<TypedGlobalTable<int,int>*>(tables[0]);
+       for (int i=1; i<=FLAGS_num_keys; i++)
+               VLOG(3) << StringPrintf("Worker %d got (%d,%d)", 
NetworkThread::Get()->id(), i, table->get(i));
+
+
+       for (int j = 0; j < 2; j++) {
+               for (int i = 1; i <= FLAGS_num_keys; i++)
+                       table->update(i, i);
+
+               for (int i = 1; i <= FLAGS_num_keys; i++)
+                       VLOG(3)
+                                       << StringPrintf("Worker %d got (%d,%d)",
+                                                       
NetworkThread::Get()->id(), i, table->get(i));
+       }
+/*
+       for (int i = 1; i <= FLAGS_num_keys; i++)
+                               VLOG(3)
+                                               << StringPrintf("Worker %d got 
(%d,%d)",
+
+                                                       
NetworkThread::Get()->id(), i, table->get(i));
+*/
+}
+
+void shutdown(){
+       if (context->AmICoordinator()){
+               VLOG(3) << "Coordinator is shutting down ...";
+               EmptyMessage msg;
+               for (int i=0; i<context->num_processes()-1; i++)
+                       network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg);
+                EmptyMessage shutdown_msg;
+                 for (int i = 0; i < network->size() - 1; i++) {
+                   network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg);
+                 }
+                 network->Flush();
+                 network->Shutdown();
+       }
+       else{
+               VLOG(3) << "Worker " << network->id() << " is shutting down 
...";
+         network->Flush();
+         VLOG(3) << "Done flushing the network thread";
+         network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END, 
EmptyMessage());
+         EmptyMessage msg;
+         network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, 
&msg);
+         VLOG(3) << "Worker received MTYPE_WORKER_SHUTDOWN";
+         table_server->ShutdownTableServer();
+         VLOG(3) << "Flushing node " << network->id();
+         network->Shutdown();
+       }
+}
+
+
+int main(int argc, char **argv) {
+       FLAGS_logtostderr = 1;
+       google::InitGoogleLogging(argv[0]);
+       gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+       context = GlobalContext::Get(FLAGS_system_conf, FLAGS_model_conf);
+       network = NetworkThread::Get();
+       VLOG(3) << "*** testing memory servers, with "
+                       << context->num_table_servers() << " servers";
+       create_mem_table(0,context->num_table_servers());
+
+       if (context->AmICoordinator()){
+               coordinator_assign_tables(0);
+               coordinator_load_data();
+               network->barrier();
+       }
+       else{
+               worker_table_init();
+               network->barrier();
+               VLOG(3) << "passed the barrier";
+               //Sleep(1);
+               worker_test_data();
+       }
+
+       shutdown();
+       return 0;
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_da.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_da.cc b/src/test/dist_test/test_da.cc
new file mode 100644
index 0000000..51aa93e
--- /dev/null
+++ b/src/test/dist_test/test_da.cc
@@ -0,0 +1,700 @@
+#include <glog/logging.h>
+#include <mpi.h>
+#include <utility>
+#include <vector>
+
+#include "da/gary.h"
+#include "da/dary.h"
+#include "da/ary.h"
+
+
+using std::make_pair;
+using std::vector;
+void Debug() {
+  int i = 0;
+  char hostname[256];
+  gethostname(hostname, sizeof(hostname));
+  printf("PID %d on %s ready for attach\n", getpid(), hostname);
+  fflush(stdout);
+  while (0 == i)
+    sleep(5);
+}
+
+
+
+void TestPar(int pdim, int rank){
+  lapis::DAry a1, a2;
+  lapis::DAry a3, a4;
+  vector<lapis::Range> slice{make_pair(0,4), make_pair(0,8)};
+  a1.SetShape({4,8});
+  a2.SetShape({4,8});
+  a1.Setup(pdim);
+  a2.Setup(pdim);
+  a1.Random();
+  a2.Random();
+  ARMCI_Barrier();
+
+
+  if(rank==0){
+    //Debug();
+    LOG(ERROR)<<"test simple partition along "<< pdim<<" dim";
+    a3=a1.Fetch(slice);
+    a4=a2.Fetch(slice);
+    LOG(ERROR)<<"fetch a";
+    LOG(ERROR)<<a3.ToString();
+    LOG(ERROR)<<"fetch b";
+    LOG(ERROR)<<a4.ToString();
+    a3.Add(a4);
+    LOG(ERROR)<<"a<- a+b";
+    LOG(ERROR)<<a3.ToString();
+  }
+  ARMCI_Barrier();
+  a1.Add(a2);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a1.Fetch(slice);
+    LOG(ERROR)<<"add then fetch";
+    LOG(ERROR)<<a5.ToString();
+  }
+}
+
+
+
+void TestMixedParElt(int pa, int pb, int pc, int rank){
+  LOG(ERROR)<<" p dim for a,b,c is "<<pa<<" "<<pb<<" "<<pc;
+  vector<lapis::Range> slice{make_pair(0,3),make_pair(0,6), make_pair(0,2)};
+  lapis::DAry a1, a2, a3;
+  a1.SetShape({3,6,2});
+  a2.SetShape({3,6,2});
+  a3.SetShape({3,6,2});
+  a1.Setup(pa);
+  a2.Setup(pb);
+  a3.Setup(pc);
+  a1.Random();
+  a2.Random();
+  a3.Random();
+
+  ARMCI_Barrier();
+  if(rank==0){
+    LOG(ERROR)<<"test elementwise ops with mixed partition";
+    lapis::DAry a5, a4;
+//    Debug();
+    a5=a1.Fetch(slice);
+    a4=a2.Fetch(slice);
+    LOG(ERROR)<<"fetch a";
+    LOG(ERROR)<<a5.ToString();
+    LOG(ERROR)<<"fetch b";
+    LOG(ERROR)<<a4.ToString();
+    a5.Copy(a4);
+    LOG(ERROR)<<"fetch op a.Copy(b)";
+    LOG(ERROR)<<a5.ToString();
+  }
+  ARMCI_Barrier();
+  a1.Copy(a2);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a1.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.Copy(b)";
+    LOG(ERROR)<<a5.ToString();
+  }
+
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a8, a4, a5({3,6,2});
+    //Debug();
+    a8=a1.Fetch(slice);
+    a4=a2.Fetch(slice);
+    LOG(ERROR)<<"fetch a";
+    LOG(ERROR)<<a8.ToString();
+    LOG(ERROR)<<"fetch b";
+    LOG(ERROR)<<a4.ToString();
+    a5.Mult(a8,a4);
+    LOG(ERROR)<<"fetch op c.mult(a,b)";
+    LOG(ERROR)<<a5.ToString();
+  }
+  ARMCI_Barrier();
+  a3.Mult(a1,a2);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.Mult(b,c)";
+    LOG(ERROR)<<a5.ToString();
+  }
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a8, a4, a5({3,6,2});
+    //Debug();
+    a8=a1.Fetch(slice);
+    a4=a2.Fetch(slice);
+    LOG(ERROR)<<"fetch a";
+    LOG(ERROR)<<a8.ToString();
+    LOG(ERROR)<<"fetch b";
+    LOG(ERROR)<<a4.ToString();
+    a5.Div(a8,a4);
+    LOG(ERROR)<<"fetch op c.div(a,b)";
+    LOG(ERROR)<<a5.ToString();
+  }
+  ARMCI_Barrier();
+  a3.Div(a1,a2);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.div(b,c)";
+    LOG(ERROR)<<a5.ToString();
+  }
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a8, a4, a5({3,6,2});
+    //Debug();
+    a8=a1.Fetch(slice);
+    LOG(ERROR)<<"fetch a";
+    LOG(ERROR)<<a8.ToString();
+    a5.Mult(a8, 3.0);
+    LOG(ERROR)<<"fetch op c.mult(a,3)";
+    LOG(ERROR)<<a5.ToString();
+  }
+  ARMCI_Barrier();
+  a3.Mult(a1,3.0);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.mult(b,3)";
+    LOG(ERROR)<<a5.ToString();
+  }
+
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a8, a4, a5({3,6,2});
+    //Debug();
+    a8=a1.Fetch(slice);
+    LOG(ERROR)<<"fetch a";
+    LOG(ERROR)<<a8.ToString();
+    a5.Square(a8);
+    LOG(ERROR)<<"fetch op c.square(a)";
+    LOG(ERROR)<<a5.ToString();
+  }
+  ARMCI_Barrier();
+  a3.Square(a1);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.sqaure(b)";
+    LOG(ERROR)<<a5.ToString();
+  }
+
+
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a8, a4, a5({3,6,2});
+    //Debug();
+    a8=a1.Fetch(slice);
+    LOG(ERROR)<<"fetch a";
+    LOG(ERROR)<<a8.ToString();
+    a5.Pow(a8,3.0);
+    LOG(ERROR)<<"fetch op c.pow(a, 3)";
+    LOG(ERROR)<<a5.ToString();
+  }
+  ARMCI_Barrier();
+  a3.Pow(a1,3.0);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.pow(b,3)";
+    LOG(ERROR)<<a5.ToString();
+  }
+
+
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  a3.SampleUniform(0.0,3.0);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.uniform(0,3)";
+    LOG(ERROR)<<a5.ToString();
+  }
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  a3.SampleGaussian(0.0,1.0);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.norm(0,1)";
+    LOG(ERROR)<<a5.ToString();
+  }
+
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  a3.Fill(1.43);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch a.fill(1.43)";
+    LOG(ERROR)<<a5.ToString();
+  }
+
+
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  a1.Random();
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a8, a4, a5({3,6,2});
+    a4=a1.Fetch(slice);
+    a5.Threshold(a4,0.3);
+    LOG(ERROR)<<"fetch op b=threshold(a,0.3)";
+    LOG(ERROR)<<a4.ToString();
+    LOG(ERROR)<<a5.ToString();
+  }
+
+  ARMCI_Barrier();
+  a3.Threshold(a1, .30f);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch b=threshold(a,0.3)";
+    LOG(ERROR)<<a5.ToString();
+  }
+
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  a1.Random();
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a8, a4, a5({3,6,2});
+    a4=a1.Fetch(slice);
+    a5.Max(a4,0.3);
+    LOG(ERROR)<<"fetch op b=max(a,0.3)";
+    LOG(ERROR)<<a4.ToString();
+    LOG(ERROR)<<a5.ToString();
+  }
+
+  ARMCI_Barrier();
+  a3.Max(a1, .30f);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch b=max(a,0.3)";
+    LOG(ERROR)<<a5.ToString();
+  }
+
+
+//////////////////////////////////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry a6, a4, a5({3,6,2});
+    a6=a1.Fetch(slice);
+    a4=a2.Fetch(slice);
+    a5.Map([](float a, float b) {return a+2*b;}, a6,a4);
+    LOG(ERROR)<<"fetch op b=map(a+2b)";
+    LOG(ERROR)<<a6.ToString();
+    LOG(ERROR)<<a4.ToString();
+    LOG(ERROR)<<a5.ToString();
+  }
+  ARMCI_Barrier();
+  a3.Map([](float a, float b) {return a+2*b;}, a1,a2);
+  if(rank==0){
+    lapis::DAry a5;
+    a5=a3.Fetch(slice);
+    LOG(ERROR)<<"op fetch b=map(a+2b)";
+    LOG(ERROR)<<a5.ToString();
+  }
+  LOG(ERROR)<<"finish elementwise ops";
+}
+
+
+void TestLargeDot(int pa, int pb, int pc, int rank){
+  if(rank==0){
+    LOG(ERROR)<<"test Dot, partition for a, b, c : "
+      << pa<<" "<<pb<<" "<<pc<<" dim";
+  }
+
+  double t1, t2, t3;
+  t1=MPI_Wtime();
+  lapis::DAry a,b,c;
+  a.SetShape({256,9216});
+  b.SetShape({9216,4096});
+  c.SetShape({256,4096});
+  a.Setup(pa);
+  b.Setup(pb);
+  c.Setup(pc);
+  a.Random();
+  b.Random();
+  c.Random();
+  ARMCI_Barrier();
+  t2=MPI_Wtime();
+  c.Dot(a,b);
+  t3=MPI_Wtime();
+  ARMCI_Barrier();
+  LOG(ERROR)<<"setup time: "<<t2-t1<<" dot time: "
+    <<t3-t2<<" wait time:"<<MPI_Wtime()-t3;
+}
+
+void TestDot(int pa, int pb, int pc, int rank){
+  vector<lapis::Range> slicea{make_pair(0,4), make_pair(0,8)};
+  vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)};
+  vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)};
+  lapis::DAry a,b,c;
+  a.SetShape({4,8});
+  b.SetShape({8,4});
+  c.SetShape({4,4});
+  a.Setup(pa);
+  b.Setup(pb);
+  c.Setup(pc);
+  a.Random();
+  b.Random();
+  c.Random();
+  //////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    LOG(ERROR)<<"test Dot, partition for a, b, c : "
+      << pa<<" "<<pb<<" "<<pc<<" dim";
+    LOG(ERROR)<<"c=a*b";
+    lapis::DAry x,y,z;
+    x=a.Fetch(slicea);
+    y=b.Fetch(sliceb);
+    z=c.Fetch(slicec);
+    z.Dot(x,y);
+    LOG(ERROR)<<"fetch dot ";
+    LOG(ERROR)<<z.ToString();
+  }
+  ARMCI_Barrier();
+  //Debug();
+  c.Dot(a,b);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry z;
+    z=c.Fetch(slicec);
+    LOG(ERROR)<<"dot fetch";
+    LOG(ERROR)<<z.ToString();
+  }
+  /////////////////////////////
+  ARMCI_Barrier();
+
+  if(rank==0){
+    LOG(ERROR)<<"a=c*b^T";
+    lapis::DAry x,y,z;
+    x=a.Fetch(slicea);
+    y=b.Fetch(sliceb);
+    z=c.Fetch(slicec);
+    x.Dot(z,y, false, true);
+    LOG(ERROR)<<"fetch dot ";
+    LOG(ERROR)<<x.ToString();
+  }
+  ARMCI_Barrier();
+  //Debug();
+  a.Dot(c,b, false, true);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry z;
+    z=a.Fetch(slicea);
+    LOG(ERROR)<<"dot fetch";
+    LOG(ERROR)<<z.ToString();
+  }
+
+  /////////////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    LOG(ERROR)<<"b=a^T*c";
+    lapis::DAry x,y,z;
+    x=a.Fetch(slicea);
+    y=b.Fetch(sliceb);
+    z=c.Fetch(slicec);
+    y.Dot(x,z, true, false);
+    LOG(ERROR)<<"fetch dot ";
+    LOG(ERROR)<<y.ToString();
+  }
+  ARMCI_Barrier();
+  //Debug();
+  b.Dot(a,c, true, false);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry z;
+    z=b.Fetch(sliceb);
+    LOG(ERROR)<<"dot fetch";
+    LOG(ERROR)<<z.ToString();
+  }
+  ARMCI_Barrier();
+  /////////////////////////////
+  ARMCI_Barrier();
+  if(rank==0){
+    LOG(ERROR)<<"b=a^T*c^T";
+    lapis::DAry x,y,z;
+    x=a.Fetch(slicea);
+    y=b.Fetch(sliceb);
+    z=c.Fetch(slicec);
+    y.Dot(x,z, true, true);
+    LOG(ERROR)<<"fetch dot ";
+    LOG(ERROR)<<y.ToString();
+  }
+  ARMCI_Barrier();
+  //Debug();
+  b.Dot(a,c, true, true);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry z;
+    z=b.Fetch(sliceb);
+    LOG(ERROR)<<"dot fetch";
+    LOG(ERROR)<<z.ToString();
+  }
+  ARMCI_Barrier();
+}
+
+
+void TestSubarray(int pa, int pb, int pc, int rank){
+  vector<lapis::Range> slicea{make_pair(0,4), make_pair(0,8)};
+  vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)};
+  vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)};
+  vector<lapis::Range> slice{make_pair(0,4)};
+  lapis::DAry a,b,c;
+  a.SetShape({4});
+  b.SetShape({8,4});
+  c.SetShape({4,4});
+  a.Setup(pa);
+  b.Setup(pb);
+  c.Setup(pc);
+  b.Random();
+  c.Random();
+
+  //Debug();
+  lapis::DAry sb=b[2];
+  lapis::DAry sc=c[3];
+
+  ARMCI_Barrier();
+  if(rank==0){
+    LOG(ERROR)<<"test subary, partition for a, b, c : "
+      << pa<<" "<<pb<<" "<<pc<<" dim";
+    lapis::DAry y,z, x({4});
+    LOG(ERROR)<<"fetch full b, c";
+    y=b.Fetch(sliceb);
+    z=c.Fetch(slicec);
+    LOG(ERROR)<<y.ToString();
+    LOG(ERROR)<<z.ToString();
+    LOG(ERROR)<<"fetch sub, sb[2], sc[3]";
+    y=sb.Fetch(slice);
+    z=sc.Fetch(slice);
+    LOG(ERROR)<<y.ToString();
+    LOG(ERROR)<<z.ToString();
+  }
+  ARMCI_Barrier();
+  a.Add(sb,sc);
+  ARMCI_Barrier();
+  //Debug();
+  if(rank==0){
+    lapis::DAry z;
+    z=a.Fetch(slice);
+    LOG(ERROR)<<"sub add fetch, sb[2]+sc[3]";
+    LOG(ERROR)<<z.ToString();
+  }
+}
+
+void TestReshape(int pa, int pb, int pc, int rank){
+  vector<lapis::Range> sliceb3{make_pair(0,2),make_pair(0,4), make_pair(0,4)};
+  vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)};
+  vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)};
+  vector<lapis::Range> slicea{make_pair(0,4)};
+  lapis::DAry a,b,c,b3,b2,b1;
+  a.SetShape({4});
+  b.SetShape({8,4});
+  c.SetShape({4,4});
+  a.Setup(pa);
+  b.Setup(pb);
+  c.Setup(pc);
+  b.Random();
+  c.Random();
+
+  b3=b.Reshape({2,4,4});
+  //Debug() ;
+  b2=b3[1];
+  if(rank==0){
+    LOG(ERROR)<<"test reshape+subary, partition for a, b, c : "
+      << pa<<" "<<pb<<" "<<pc<<" dim";
+    lapis::DAry y,z,x;
+    LOG(ERROR)<<"fetch b, b2, c";
+    y=b.Fetch(sliceb);
+    z=b2.Fetch(slicec);
+    x=c.Fetch(slicec);
+    LOG(ERROR)<<y.ToString();
+    LOG(ERROR)<<z.ToString();
+    LOG(ERROR)<<x.ToString();
+    LOG(ERROR)<<"fetch sub, b2+c";
+    z.Add(x);
+    LOG(ERROR)<<z.ToString();
+  }
+
+  ARMCI_Barrier();
+  c.Add(b2);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry y,z,x;
+    x=c.Fetch(slicec);
+    LOG(ERROR)<<"sub add,fetch c+b2";
+    LOG(ERROR)<<x.ToString();
+  }
+  ARMCI_Barrier();
+  b2.Add(c);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry y,z,x;
+    x=b2.Fetch(slicec);
+    LOG(ERROR)<<"sub add,fetch b2+c";
+    LOG(ERROR)<<x.ToString();
+  }
+  ARMCI_Barrier();
+  b1=b2[2];
+  if(rank==0){
+    lapis::DAry y,z,x;
+    x=b1.Fetch(slicea);
+    LOG(ERROR)<<"fetch b1";
+    LOG(ERROR)<<x.ToString();
+  }
+
+  a.Add(b1);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry y,z,x;
+    x=a.Fetch(slicea);
+    LOG(ERROR)<<"add fetch a+b1";
+    LOG(ERROR)<<x.ToString();
+  }
+  ARMCI_Barrier();
+  b1.Add(a);
+  ARMCI_Barrier();
+  if(rank==0){
+    lapis::DAry y,z,x;
+    x=b1.Fetch(slicea);
+    LOG(ERROR)<<"add fetch b1+a";
+    LOG(ERROR)<<x.ToString();
+  }
+
+  ARMCI_Barrier();
+  {
+    lapis::DAry b3=b.Reshape({4,2,4});
+    lapis::DAry a;
+    a.SetShape({2,4});
+    a.Setup(pa);
+    a.Random();
+    lapis::DAry b1=b3[1];
+    lapis::DAry b2=b3[3];
+    lapis::DAry c;
+    c.SetShape({2,2});
+    c.Setup(pc);
+    ARMCI_Barrier();
+    c.Dot(a,b2,false, true);
+    ARMCI_Barrier();
+    if(rank==0){
+      lapis::DAry x,y,z,zz({2,2});
+      y=b3.Fetch({make_pair(0,4), make_pair(0,2), make_pair(0,4)});
+      x=a.Fetch({make_pair(0,2), make_pair(0,4)});
+      LOG(ERROR)<<"fetch b,a";
+      LOG(ERROR)<<y.ToString();
+      LOG(ERROR)<<x.ToString();
+      z=y[3];
+      zz.Dot(x,z,false, true);
+      LOG(ERROR)<<"fetch dot c=a*b[3]^T";
+      LOG(ERROR)<<zz.ToString();
+
+      x=a.Fetch({make_pair(0,2), make_pair(0,4)});
+      y=b2.Fetch({make_pair(0,2), make_pair(0,4)});
+      z=c.Fetch({make_pair(0,2), make_pair(0,2)});
+      LOG(ERROR)<<"op fetch c=a*b[3]^T";
+      LOG(ERROR)<<x.ToString();
+      LOG(ERROR)<<y.ToString();
+      LOG(ERROR)<<z.ToString();
+
+    }
+    ARMCI_Barrier();
+  }
+}
+
+
+
+int main(int argc, char**argv){
+ // MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
+  MPI_Init(&argc, &argv);
+  int rank, nprocs;
+  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
+  vector<int> procs;
+  for (int i = 0; i < nprocs; i++) {
+    procs.push_back(i);
+  }
+  //Debug();
+  lapis::GAry::Init(rank,procs);
+  google::InitGoogleLogging(argv[0]);
+  /*
+  if(nprocs%3==0){
+    TestMixedParElt(0,0,0,rank);
+    TestMixedParElt(0,0,1,rank);
+    TestMixedParElt(0,1,0,rank);
+    TestMixedParElt(1,0,0,rank);
+    TestMixedParElt(1,1,0,rank);
+    TestMixedParElt(1,1,1,rank);
+    TestMixedParElt(0,1,1,rank);
+  }
+  if(nprocs%2==0){
+    TestMixedParElt(1,1,1,rank);
+    TestMixedParElt(1,2,1,rank);
+    TestMixedParElt(2,1,1,rank);
+    TestMixedParElt(1,1,2,rank);
+    TestMixedParElt(2,2,2,rank);
+  }
+  TestDot(0,0,0,rank);
+  TestDot(0,0,1,rank);
+  TestDot(0,1,0,rank);
+  TestDot(0,1,1,rank);
+  TestDot(1,0,0,rank);
+  TestDot(1,0,1,rank);
+  TestDot(1,1,0,rank);
+  TestDot(1,1,1,rank);
+
+  TestPar(0, rank);
+  TestPar(1, rank);
+  */
+  double start, end;
+  start=MPI_Wtime();
+  TestLargeDot(0,0,0,rank);
+  TestLargeDot(0,0,1,rank);
+  TestLargeDot(0,1,0,rank);
+  TestLargeDot(0,1,1,rank);
+  TestLargeDot(1,0,0,rank);
+  TestLargeDot(1,0,1,rank);
+  TestLargeDot(1,1,0,rank);
+  TestLargeDot(1,1,1,rank);
+  end=MPI_Wtime();
+  if(rank==0)
+    LOG(ERROR)<<"dot time for 256*4k 4k*4k matrix, "<<end-start;
+  /*
+  TestSubarray(0,0,0,rank);
+  TestSubarray(0,0,1,rank);
+  TestSubarray(0,1,0,rank);
+  TestSubarray(0,1,1,rank);
+  TestReshape(0,0,0,rank);
+  TestReshape(0,0,1,rank);
+  TestReshape(0,1,0,rank);
+  TestReshape(0,1,1,rank);
+  */
+
+  LOG(ERROR)<<"finish";
+  lapis::GAry::Finalize();
+  MPI_Finalize();
+  return 0;
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_dary.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_dary.cc b/src/test/dist_test/test_dary.cc
new file mode 100644
index 0000000..ce605e6
--- /dev/null
+++ b/src/test/dist_test/test_dary.cc
@@ -0,0 +1,85 @@
+#include <iostream>
+#include "darray/dary.h"
+#include "utils/timer.h"
+
+
+int main() {
+  lapis::DAry x({1000000});
+  lapis::DAry y({1000000});
+  x.Random();
+  y.Random();
+  lapis::Timer t;
+  for(int i=0;i<100;i++){
+    float *dptrx=x.dptr();
+    float *dptry=y.dptr();
+    for(int k=0;k<10000;k++)
+      dptrx[k]*=dptry[k];
+  }
+  std::cout<<"arymath: "<<t.elapsed()/10<<std::endl;
+  lapis::DAry m({1000000});
+  lapis::DAry n({1000000});
+  m.Random();
+  n.Random();
+  t.Reset();
+  for(int i=0;i<100;i++)
+    m.Mult(m,n);
+  std::cout<<"arymath: "<<t.elapsed()/10<<std::endl;
+
+
+  lapis::DAry a({2,2});
+  lapis::DAry b,c;
+  b.InitLike(a);
+  c.InitLike(a);
+  a.Random();
+  b.Random();
+  std::cout<<a.ToString()<<std::endl;
+  std::cout<<b.ToString()<<std::endl;
+  c.Dot(a,b);
+  std::cout<<"c=a.b"<<c.ToString()<<std::endl;
+  a.Add(b);
+  std::cout<<"a=a+b"<<a.ToString()<<std::endl;
+  a.Mult(a,b);
+  std::cout<<"a=a*b"<<a.ToString()<<std::endl;
+  a.Minus(a,b);
+  std::cout<<"a=a-b"<<a.ToString()<<std::endl;
+
+  c.Random();
+  std::cout<<"random c "<<c.ToString()<<std::endl;
+  a.Threshold(c, 0.3);
+  std::cout<<"a=threshold(c,0.3) "<<a.ToString()<<std::endl;
+
+  a.Pow(c, 0.4);
+  std::cout<<"a=Pow(c,0.4) "<<a.ToString()<<std::endl;
+
+  c.Set(0.5);
+  std::cout<<"c=set(0.5) "<<c.ToString()<<std::endl;
+  a.Square(c);
+  std::cout<<"a=square(c) "<<a.ToString()<<std::endl;
+
+  c.Copy(a);
+  std::cout<<"c=Copy(a) "<<c.ToString()<<std::endl;
+
+  lapis::DAry d({2});
+  d.SumRow(b);
+  std::cout<<"d=SumRow(b) "<<d.ToString()<<std::endl;
+  d.SumCol(b);
+  std::cout<<"d=SumCol(b) "<<d.ToString()<<std::endl;
+  b.AddRow(d);
+  std::cout<<"b=AddRow(d) "<<b.ToString()<<std::endl;
+  b.AddCol(d);
+  std::cout<<"b=AddCol(d) "<<b.ToString()<<std::endl;
+
+  std::cout<<"max(b) "<<b.Max()<<std::endl;
+  std::cout<<"Sum(b) "<<b.Sum()<<std::endl;
+
+  lapis::DAry e({3,3,3});
+  e.SampleGaussian(0.0f,1.0f);
+  std::cout<<"Gaussain e "<<e.ToString()<<std::endl;
+
+  lapis::DAry f({9});
+  f.Sum(e, 0, {0,2});
+  std::cout<<"f.sum  "<<f.ToString()<<std::endl;
+
+  return 0;
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_disk_table.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_disk_table.cc 
b/src/test/dist_test/test_disk_table.cc
new file mode 100644
index 0000000..99987bb
--- /dev/null
+++ b/src/test/dist_test/test_disk_table.cc
@@ -0,0 +1,188 @@
+//  Copyright © 2014 Anh Dinh. All Rights Reserved.
+//  main class for testing distributed memory layer
+//
+//  the command to run this should be:
+//             mpirun -hostfile <host> -bycore -nooversubscribe
+//                             -n <num_servers> test -sync_update
+
+
+#include "core/global-table.h"
+#include "core/common.h"
+#include "core/disk-table.h"
+#include "core/table.h"
+#include "core/table_server.h"
+#include "utils/global_context.h"
+#include <gflags/gflags.h>
+#include "proto/model.pb.h"
+#include "worker.h"
+#include <cmath>
+
+DEFINE_int32(record_size,100, "# elements per float vector");
+DECLARE_int32(block_size);
+DEFINE_int32(table_size, 1000, "# records per table");
+DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
+DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
+DEFINE_bool(is_testing_put,true, "data put vs. data get");
+DECLARE_int32(debug_index);
+DECLARE_int32(table_buffer);
+using namespace lapis;
+
+typedef map<int, GlobalTable*> Map;
+Map tables;
+
+//  put random message to the pointers
+void create_random_message(FloatVector* message, const int count){
+       for (int i=0; i<FLAGS_record_size; i++){
+               message->add_data(count*FLAGS_record_size+i);
+       }
+}
+
+void create_disk_table(int id){
+       DiskTableDescriptor *info = new DiskTableDescriptor(id, "disk_test",
+                       FLAGS_block_size);
+       info->key_marshal = new Marshal<int>();
+       info->value_marshal = new Marshal<FloatVector>();
+       tables[id] = new TypedDiskTable<int,FloatVector>(info);
+}
+
+
+//  if testing put, write and send data. Else do nothing
+void run_coordinator(shared_ptr<NetworkThread> network, int tid){
+       // wait for wokers to be up
+       RegisterWorkerRequest req;
+       for (int i=0; i<network->size()-1; i++)
+               network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req);
+
+       // put data in
+       TypedDiskTable<int, FloatVector>* table = 
static_cast<TypedDiskTable<int,
+                       FloatVector>*>(tables[tid]);
+
+       //  if testing put()
+       if (FLAGS_is_testing_put) {
+               int count = 0;
+               for (int i = 0; i < FLAGS_table_size; i++) {
+                       FloatVector message;
+                       create_random_message(&message, i);
+                       table->put(i, message);
+                       count += message.ByteSize();
+               }
+               table->finish_put();
+       }
+
+       VLOG(3) << "Coordinator about to shut down";
+       for (int i=0; i<network->size()-1; i++){
+               EmptyMessage end_msg;
+               network->Read(i,MTYPE_WORKER_END, &end_msg);
+       }
+
+       EmptyMessage shutdown_msg;
+       for (int i = 0; i < network->size() - 1; i++) {
+               network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg);
+       }
+       network->Flush();
+       network->Shutdown();
+       table->PrintStats();
+
+       if (FLAGS_is_testing_put) {
+               int sub_blocks = ceil(((double) FLAGS_table_size / 
FLAGS_table_buffer));
+               CHECK_EQ(table->stats()["total sub block sent"], sub_blocks);
+               CHECK_EQ(table->stats()["total record sent"], FLAGS_table_size);
+               VLOG(3) << "test coordinator sending: successful";
+       }
+
+}
+
+//  if testing put(), do nothing. Else read() until done()
+void run_worker(shared_ptr<NetworkThread> network, int tid){
+       TableServer* ts = new TableServer();
+       ts->StartTableServer(tables);
+
+       // put data in
+       TypedDiskTable<int, FloatVector>* table = 
static_cast<TypedDiskTable<int,
+                       FloatVector>*>(tables[tid]);
+       double total_read = 0;
+       if (!FLAGS_is_testing_put){
+               VLOG(3) << "testing read from table ...";
+               table->Load();
+               while (!table->done()){
+                       int k;
+                       FloatVector v;
+                       table->get(&k,&v);
+                       table->Next();
+                       total_read++;
+               }
+
+               int k;
+               FloatVector v;
+               table->get(&k, &v);
+               total_read++;
+       }
+
+       int size = network->size();
+
+       network->Flush();
+       network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END,
+                       EmptyMessage());
+       EmptyMessage msg;
+
+       int src = 0;
+       network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, 
&msg,
+                       &src);
+       network->Flush();
+       network->Shutdown();
+
+       Stats stats =
+                       (static_cast<TypedDiskTable<int, 
FloatVector>*>(tables[0]))->stats();
+
+       if (FLAGS_is_testing_put) {
+               int sub_blocks = ceil(((double) FLAGS_table_size / 
FLAGS_table_buffer));
+               if (size == 2) {
+                       CHECK_EQ(stats["total sub block received"], sub_blocks);
+                       CHECK_EQ(stats["total record stored"], 
FLAGS_table_size);
+               }
+               VLOG(3) << "test table-server writing: successful";
+               VLOG(3) << "number of sub blocks = " << sub_blocks;
+               VLOG(3) << "total data stored = " << stats["total byte stored"];
+       }
+       else{
+               if (size==2)
+                       CHECK_EQ(stats["total record read"], FLAGS_table_size);
+               VLOG(3) << "test table-server reading: successful";
+               VLOG(3) << "read bandwidth = "
+                               << (stats["total byte read"]
+                                               / (stats["last byte read"] - 
stats["first byte read"]));
+               //VLOG(3) << "total number of record read = " << stats["total 
record read"];
+       }
+
+       network->PrintStats();
+       static_cast<TypedDiskTable<int, FloatVector>*>(tables[0])->PrintStats();
+}
+
+//  check all the records have been stored to disk
+int test_disk(int tid) {
+       // Init GlobalContext
+       auto gc = lapis::GlobalContext::Get(FLAGS_system_conf, 
FLAGS_model_conf);
+       //start network thread
+       shared_ptr<NetworkThread> network = NetworkThread::Get();
+
+       if (network->id() == network->size() - 1)
+               run_coordinator(network, tid);
+       else
+               run_worker(network,tid);
+       return 0;
+}
+
+// for debugging use
+//#ifndef FLAGS_v
+//  DEFINE_int32(v, 3, "vlog controller");
+//#endif
+
+int main(int argc, char **argv) {
+       FLAGS_logtostderr = 1;
+       google::InitGoogleLogging(argv[0]);
+       gflags::ParseCommandLineFlags(&argc, &argv, true);
+       create_disk_table(0);
+       return test_disk(0);
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_mnistlayer.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_mnistlayer.cc 
b/src/test/dist_test/test_mnistlayer.cc
new file mode 100644
index 0000000..882e121
--- /dev/null
+++ b/src/test/dist_test/test_mnistlayer.cc
@@ -0,0 +1,165 @@
+#include <gtest/gtest.h>
+#include <sys/stat.h>
+#include <cstdint>
+#include "opencv2/highgui/highgui.hpp"
+#include "opencv2/imgproc/imgproc.hpp"
+
+#include "model/layer.h"
+#include "proto/model.pb.h"
+#include "utils/shard.h"
+using namespace singa;
+TEST(MnistLayerTest, SingleScale){
+  LayerProto proto;
+  MnistProto *mnist=proto.mutable_mnist_param();
+  mnist->set_size(55);
+  MnistImageLayer layer;
+  layer.FromProto(proto);
+  cv::Mat image;
+  image=cv::imread("src/test/data/mnist.png", 0);
+  string pixel;
+  pixel.resize(image.rows*image.cols);
+  for(int i=0,k=0;i<image.rows;i++)
+    for(int j=0; j<image.cols;j++)
+      pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j));
+  Record rec;
+  rec.set_type(Record_Type_kMnist);
+  MnistRecord *mrec=rec.mutable_mnist();
+  mrec->set_pixel(pixel);
+  layer.Setup(1, rec, kNone);
+  layer.AddInputRecord(rec);
+
+  const vector<uint8_t>& dat=layer.Convert2Image(0);
+  int s=static_cast<int>(sqrt(dat.size()));
+  cv::Mat newimg(s,s,CV_8UC1);
+  int count=0;
+  for(int i=0,k=0;i<newimg.rows;i++)
+    for(int j=0; j<newimg.cols;j++){
+      count+=dat[k]>0;
+      newimg.at<uint8_t>(i,j)=dat[k++];
+    }
+  //LOG(ERROR)<<"image positive "<<count<<" size "<<s;
+  cv::imwrite("src/test/data/mnist_scale.png", newimg);
+}
+
+TEST(MnistLayerTest, SingleAffineTransform){
+  LayerProto proto;
+  MnistProto *mnist=proto.mutable_mnist_param();
+  mnist->set_beta(15);
+  mnist->set_gamma(16);
+  mnist->set_size(55);
+  MnistImageLayer layer;
+  layer.FromProto(proto);
+  cv::Mat image;
+  image=cv::imread("src/test/data/mnist.png", 0);
+  string pixel;
+  pixel.resize(image.rows*image.cols);
+  for(int i=0,k=0;i<image.rows;i++)
+    for(int j=0; j<image.cols;j++)
+      pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j));
+  Record rec;
+  rec.set_type(Record_Type_kMnist);
+  MnistRecord *mrec=rec.mutable_mnist();
+  mrec->set_pixel(pixel);
+  layer.Setup(1, rec, kNone);
+  layer.AddInputRecord(rec);
+
+  const vector<uint8_t>& dat=layer.Convert2Image(0);
+  int s=static_cast<int>(sqrt(dat.size()));
+  cv::Mat newimg(s,s,CV_8UC1);
+  int count=0;
+  for(int i=0,k=0;i<newimg.rows;i++)
+    for(int j=0; j<newimg.cols;j++){
+      count+=dat[k]>0;
+      newimg.at<uint8_t>(i,j)=dat[k++];
+    }
+  //LOG(ERROR)<<"image positive "<<count<<" size "<<s;
+
+  cv::imwrite("src/test/data/mnist_affine.png", newimg);
+}
+TEST(MnistLayerTest, SingleElasticDistortion){
+  LayerProto proto;
+  MnistProto *mnist=proto.mutable_mnist_param();
+  mnist->set_elastic_freq(1);
+  mnist->set_sigma(6);
+  mnist->set_alpha(36);
+  mnist->set_beta(15);
+  mnist->set_gamma(16);
+  mnist->set_size(55);
+  mnist->set_kernel(21);
+  MnistImageLayer layer;
+  layer.FromProto(proto);
+  cv::Mat image;
+  image=cv::imread("src/test/data/mnist.png", 0);
+  string pixel;
+  pixel.resize(image.rows*image.cols);
+  for(int i=0,k=0;i<image.rows;i++)
+    for(int j=0; j<image.cols;j++)
+      pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j));
+  Record rec;
+  rec.set_type(Record_Type_kMnist);
+  MnistRecord *mrec=rec.mutable_mnist();
+  mrec->set_pixel(pixel);
+  layer.Setup(1, rec, kNone);
+  layer.AddInputRecord(rec);
+
+  const vector<uint8_t>& dat=layer.Convert2Image(0);
+  int s=static_cast<int>(sqrt(dat.size()));
+  cv::Mat newimg(s,s,CV_8UC1);
+  int count=0;
+  for(int i=0,k=0;i<newimg.rows;i++)
+    for(int j=0; j<newimg.cols;j++){
+      count+=dat[k]>0;
+      newimg.at<uint8_t>(i,j)=dat[k++];
+    }
+  cv::imwrite("src/test/data/mnist_elastic.png", newimg);
+}
+TEST(MnistLayerTest, MultElasticDistortion){
+  LayerProto proto;
+  MnistProto *mnist=proto.mutable_mnist_param();
+  int kTotal=100;
+  int kSize=29;
+  mnist->set_elastic_freq(kTotal);
+  mnist->set_sigma(6);
+  mnist->set_alpha(36);
+  mnist->set_beta(15);
+  mnist->set_gamma(16);
+  mnist->set_size(kSize);
+  mnist->set_kernel(21);
+  MnistImageLayer layer;
+  layer.FromProto(proto);
+  vector<vector<int>> shapes{{kTotal, kSize,kSize}};
+  layer.Setup(shapes, kNone);
+  shard::Shard 
source("/data1/wangwei/singa/data/mnist/test/",shard::Shard::kRead);
+  int n=static_cast<int>(sqrt(kTotal));
+  cv::Mat origin(n*28,n*28, CV_8UC1);
+  char disp[1024];
+  for(int x=0;x<n;x++){
+    sprintf(disp+strlen(disp), "\n");
+    for(int y=0;y<n;y++){
+      Record rec;
+      string key;
+      CHECK(source.Next(&key, &rec));
+      const string pixel=rec.mnist().pixel();
+      cv::Mat img=origin(cv::Rect(y*28, x*28, 28, 28));
+      for(int i=0,k=0;i<28;i++)
+        for(int j=0;j<28;j++)
+          img.at<uint8_t>(i,j)=static_cast<uint8_t>(pixel[k++]);
+      layer.AddInputRecord(rec);
+      sprintf(disp+strlen(disp), "%d ", rec.mnist().label());
+    }
+  }
+  LOG(ERROR)<<disp;
+  cv::imwrite("src/test/data/mnist_big.png", origin);
+
+  cv::Mat output(n*kSize,n*kSize, CV_8UC1);
+  for(int i=0;i<kTotal;i++){
+    const vector<uint8_t>& dat=layer.Convert2Image(i);
+    int x=(i/n);
+    int y=i%n;
+    cv::Mat img=output(cv::Rect(y*kSize, x*kSize, kSize, kSize));
+    for(int i=0,k=0;i<kSize;i++)
+      for(int j=0;j<kSize;j++)
+        img.at<uint8_t>(i,j)=dat[k++];
+  }
+  cv::imwrite("src/test/data/mnist_bigout.png", output);
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_model.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_model.cc b/src/test/dist_test/test_model.cc
new file mode 100644
index 0000000..c3f98b9
--- /dev/null
+++ b/src/test/dist_test/test_model.cc
@@ -0,0 +1,25 @@
+// Copyright © 2014 Wei Wang. All Rights Reserved.
+// 2014-08-02 14:13
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+
+
+#include "model/sgd_trainer.h"
+#include "model/net.h"
+#include "proto/model.pb.h"
+#include "utils/proto_helper.h"
+
+DEFINE_int32(v, 1, "vlog");
+
+int main(int argc, char** argv) {
+  FLAGS_logtostderr=1;
+  google::InitGoogleLogging(argv[0]);
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  lapis::ModelProto model_proto;
+  lapis::ReadProtoFromTextFile("examples/imagenet12/model.conf", &model_proto);
+  lapis::SGDTrainer trainer;
+  trainer.Init(model_proto.trainer());
+  lapis::Net net;
+  net.Init(model_proto.net());
+  trainer.Run(&net);
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_neuralnet.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_neuralnet.cc 
b/src/test/dist_test/test_neuralnet.cc
new file mode 100644
index 0000000..a857124
--- /dev/null
+++ b/src/test/dist_test/test_neuralnet.cc
@@ -0,0 +1,141 @@
+#include <gtest/gtest.h>
+#include <model/neuralnet.h>
+#include "proto/model.pb.h"
+#include "utils/common.h"
+#include "utils/param_updater.h"
+
+using namespace singa;
+NetProto CreateMLPProto(){
+  ModelProto model;
+  ReadProtoFromTextFile("examples/mnist/mlp.conf", &model);
+  return model.neuralnet();
+}
+TEST(NeuralnetTest, BP){
+  ModelProto model;
+  ReadProtoFromTextFile("examples/mnist/mlp.conf", &model);
+
+  AdaGradUpdater updater;
+  updater.Init(model.solver().updater());
+
+  NeuralNet net(model.neuralnet());
+  auto layers=net.layers();
+  for(int i=0;i<3;i++){
+    bool firstlayer=true;
+    for(auto& layer: layers){
+      layer->ComputeFeature();
+      if(firstlayer){
+        DataLayer* dl=static_cast<DataLayer*>(layer.get());
+        dl->CompletePrefetch();
+        firstlayer=false;
+      }
+    }
+
+    for(int k=layers.size()-1;k>=0;k--){
+      layers[k]->ComputeGradient();
+      for(Param* param: layers[k]->GetParams())
+        updater.Update(i, param);
+    }
+  }
+}
+NetProto CreateConvNetProto(){
+  NetProto proto;
+  LayerProto *layer;
+
+  layer=proto.add_layer();
+  layer->set_name("data");
+  layer->set_type("kShardData");
+  DataProto *data=layer->mutable_data_param();
+  data->set_batchsize(8);
+  data->set_path("/data1/wangwei/singa/data/mnist/train/");
+
+  // 4x3x10x10
+  layer=proto.add_layer();
+  layer->set_name("mnist");
+  layer->set_type("kMnistImage");
+  layer->add_srclayers("data");
+
+  // 4x1
+  layer=proto.add_layer();
+  layer->set_name("label");
+  layer->set_type("kLabel");
+  layer->add_srclayers("data");
+
+  // 4x8x9x9
+  layer=proto.add_layer();
+  layer->set_name("conv1");
+  layer->set_type("kConvolution");
+  layer->add_srclayers("mnist");
+  layer->add_param();
+  layer->add_param();
+  ConvolutionProto *conv=layer->mutable_convolution_param();
+  conv->set_num_filters(8);
+  conv->set_kernel(2);
+
+  // 4x8x9x9
+  layer=proto.add_layer();
+  layer->set_name("relu1");
+  layer->set_type("kReLU");
+  layer->add_srclayers("conv1");
+
+  // 4x8x4x4
+  layer=proto.add_layer();
+  layer->set_name("pool1");
+  layer->set_type("kPooling");
+  layer->add_srclayers("relu1");
+  PoolingProto *pool=layer->mutable_pooling_param();
+  pool->set_kernel(4);
+  pool->set_stride(2);
+
+  // 4x10
+  layer=proto.add_layer();
+  layer->set_name("fc1");
+  layer->set_type("kInnerProduct");
+  layer->add_srclayers("pool1");
+  layer->add_param();
+  layer->add_param();
+  InnerProductProto *inner=layer->mutable_inner_product_param();
+  inner->set_num_output(10);
+
+  // 4x10
+  layer=proto.add_layer();
+  layer->set_name("loss");
+  layer->set_type("kSoftmaxLoss");
+  layer->add_srclayers("fc1");
+  layer->add_srclayers("label");
+
+  return proto;
+}
+
+TEST(NeuralNetTest, NoPartition){
+  NetProto proto=CreateConvNetProto();
+  NeuralNet net(proto);
+  const auto& layers=net.layers();
+  ASSERT_EQ(8, layers.size());
+  ASSERT_EQ("data", layers.at(0)->name());
+  ASSERT_EQ("loss", layers.at(7)->name());
+}
+
+TEST(NeuralNetTest, DataPartition){
+  NetProto proto=CreateConvNetProto();
+  proto.set_partition_type(kDataPartition);
+  NeuralNet net(proto, 3);
+  const auto& layers=net.layers();
+  ASSERT_EQ(28, layers.size());
+  ASSERT_EQ("data", layers.at(0)->name());
+}
+TEST(NeuralNetTest, LayerPartition){
+  NetProto proto=CreateConvNetProto();
+  proto.set_partition_type(kLayerPartition);
+  NeuralNet net(proto, 2);
+ // const auto& layers=net.layers();
+}
+TEST(NeuralNetTest, HyridPartition){
+  NetProto proto=CreateConvNetProto();
+  int num_layers=proto.layer_size();
+  proto.mutable_layer(num_layers-2)->set_partition_type(kDataPartition);
+  proto.mutable_layer(num_layers-1)->set_partition_type(kDataPartition);
+  proto.set_partition_type(kLayerPartition);
+  NeuralNet net(proto, 2);
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_pm.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_pm.cc b/src/test/dist_test/test_pm.cc
new file mode 100644
index 0000000..67c210a
--- /dev/null
+++ b/src/test/dist_test/test_pm.cc
@@ -0,0 +1,88 @@
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <iostream>
+#include <fstream>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include "utils/cluster.h"
+#include "utils/common.h"
+#include "proto/model.pb.h"
+#include "proto/cluster.pb.h"
+#include "server/server.h"
+#include "server/pm_server.h"
+#include "worker/pm_client.h"
+#include "worker/worker.h"
+#include "proto/topology.pb.h"
+#include <string.h>
+#include <google/protobuf/text_format.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+
+using namespace google::protobuf::io;
+using google::protobuf::TextFormat;
+
+using std::ifstream;
+
+/**
+ * Testing put/get/update performance of the new zeromq-based parameter
+ * servers.
+ */
+DEFINE_int32(procsID, 0, "global process ID");
+DEFINE_string(hostfile, "examples/imagenet12/hostfile", "hostfile");
+DEFINE_string(cluster_conf, "examples/imagenet12/cluster.conf",
+    "configuration file for the cluster");
+DEFINE_string(model_conf, "examples/imagenet12/model.conf",
+    "Deep learning model configuration file");
+
+DEFINE_string(topology_config,"examples/imagenet12/topology.conf", "Network of 
servers");
+DEFINE_int32(server_threads,1,"Number of server's worker threads per process");
+DEFINE_int32(client_threads,1,"Number of client's worker threads per process");
+
+DEFINE_string(mode, "client", "client or server mode");
+DEFINE_int32(node_id, 0, "ID of the node, client or server");
+DEFINE_int32(primary_set, 0, "ID of the primary server set (for client mode 
only)");
+
+/**
+ *
+ * Read the topology file in, and start the Client or server respectively.
+ *
+ * test_pm --node_id <id>
+ */
+
+
+#ifndef FLAGS_v
+  DEFINE_int32(v, 3, "vlog controller");
+#endif
+
+int main(int argc, char **argv) {
+       google::InitGoogleLogging(argv[0]);
+       gflags::ParseCommandLineFlags(&argc, &argv, true);
+       FLAGS_logtostderr = 1;
+
+
+       //Read in the topology file
+       int fd = open(FLAGS_topology_config.c_str(), O_RDONLY);
+       assert(fd != -1);
+       singa::Topology topology;
+       TextFormat::Parse(new FileInputStream(fd), &topology);
+
+
+       //read host file
+       ifstream hostfile(FLAGS_hostfile.c_str());
+       string host;
+       vector<string> hosts;
+       while (getline(hostfile, host))
+               hosts.push_back(host);
+       
+       if (FLAGS_node_id < topology.nservers()) {
+               singa::SingaServer *server = new 
singa::SingaServer(FLAGS_node_id, topology, hosts);
+               server->StartServer();
+       } else {
+               singa::SingaClient *client = new 
singa::SingaClient(FLAGS_node_id, topology, hosts);
+               client->StartClient();
+       }
+       
+       return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_router.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_router.cc 
b/src/test/dist_test/test_router.cc
new file mode 100644
index 0000000..bed3d99
--- /dev/null
+++ b/src/test/dist_test/test_router.cc
@@ -0,0 +1,27 @@
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include "utils/router.h"
+#include "utils/common.h"
+#include "utils/cluster.h"
+DEFINE_string(hostfile, "examples/imagenet12/hostfile", "hostfile");
+DEFINE_string(cluster_conf, "examples/imagenet12/cluster.conf",
+    "configuration file for the cluster");
+DEFINE_int32(procsID, 0, "global process ID");
+
+int main(int argc, char** argv){
+  google::InitGoogleLogging(argv[0]);
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  // Init Cluster
+  singa::ClusterProto pcluster;
+  singa::ReadProtoFromTextFile(FLAGS_cluster_conf.c_str(), &pcluster);
+  auto cluster=singa::Cluster::Get(pcluster, FLAGS_hostfile, FLAGS_procsID);
+  if(cluster->AmIServer()){
+    singa::Router server(5732);
+    CHECK(server.Bind(cluster->server_addr(0), cluster->nworkers()));
+  }else{
+    singa::Router worker(5732);
+    CHECK(worker.Connect(cluster->server_addr(0)));
+  }
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/src/test/dist_test/test_split.cc
----------------------------------------------------------------------
diff --git a/src/test/dist_test/test_split.cc b/src/test/dist_test/test_split.cc
new file mode 100644
index 0000000..674d546
--- /dev/null
+++ b/src/test/dist_test/test_split.cc
@@ -0,0 +1,304 @@
+//  Copyright © 2014 Anh Dinh. All Rights Reserved.
+
+
+//  Testing the unbalance in spliting parameter vectors.
+
+#include "core/global-table.h"
+#include "core/common.h"
+#include "core/disk-table.h"
+#include "core/table.h"
+#include "core/table_server.h"
+#include "utils/global_context.h"
+#include <gflags/gflags.h>
+#include "proto/model.pb.h"
+#include "worker.h"
+#include "coordinator.h"
+//#include "model_controller/myacc.h"
+#include "utils/common.h"
+
+#include <cmath>
+#include <stdlib.h>
+#include <vector>
+#include <iostream>
+#include <fstream>
+
+using namespace lapis;
+using std::vector;
+
+//DEFINE_bool(sync_update, false, "Synchronous put/update queue");
+DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration 
file for node roles");
+DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model 
configuration file");
+DEFINE_int64(threshold,1000000, "max # of parameters in a vector");
+DEFINE_int32(iterations,5,"numer of get/put iterations");
+DEFINE_int32(workers,2,"numer of workers doing get/put");
+#ifndef FLAGS_v
+  DEFINE_int32(v, 3, "vlog controller");
+#endif
+
+typedef map<int, GlobalTable*> Map;
+Map tables;
+shared_ptr<NetworkThread> network;
+shared_ptr<GlobalContext> context;
+std::vector<ServerState*> server_states;
+TableServer *table_server;
+
+FloatVector large_msg, small_msg;
+const int SIZE=16;
+
+long sizes[] = { 37448736, 16777216, 4096000, 1327104, 884736, 884736, 614400,
+               14112, 4096, 4096, 1000, 384, 384, 256, 256, 96 };
+
+vector<FloatVector*> value_msg;
+
+int num_keys;
+
+// create large and small messages
+void init_messages(){
+       num_keys = 0;
+  long nservers=context->num_table_servers();
+       for (int i=0; i<SIZE; i++){
+               int total=0;
+    int threshold=std::max(FLAGS_threshold,0l);//, sizes[i]/nservers);
+    VLOG(3)<<"worker: "<<threshold;
+               while (total<sizes[i]){
+                       FloatVector* fv = new FloatVector();
+                       for (int j=0; j+total<sizes[i] && j<threshold; j++)
+                               
fv->add_data(static_cast<float>(rand())/static_cast<float>(RAND_MAX));
+                       value_msg.push_back(fv);
+                       total+=threshold;
+                       num_keys++;
+               }
+       }
+}
+
+void create_mem_table(int id, int num_shards){
+
+       TableDescriptor *info = new TableDescriptor(id, num_shards);
+         info->key_marshal = new Marshal<int>();
+         info->value_marshal = new Marshal<FloatVector>();
+         info->sharder = new Sharding::Mod;
+         info->accum = new MyAcc();
+         info->partition_factory = new typename SparseTable<int, 
FloatVector>::Factory;
+         auto table=new TypedGlobalTable<int, FloatVector>();
+         table->Init(info);
+         tables[id] = table;
+}
+
+void coordinator_assign_tables(int id){
+       for (int i = 0; i < context->num_processes()-1; ++i) {
+           RegisterWorkerRequest req;
+           int src = 0;
+           network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src);
+           //  adding memory server.
+           if (context->IsTableServer(i)) {
+             server_states.push_back(new ServerState(i));
+           }
+         }
+         LOG(INFO) << " All servers registered and started up. Ready to go";
+         //  set itself as the current worker for the table
+         tables[id]->worker_id_ = network->id();
+
+         // memory servers are specified in global context. Round-robin 
assignment
+
+           VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< 
id;
+
+           int server_idx = 0;
+           for (int shard = 0; shard < tables[id]->num_shards(); ++shard) {
+             ServerState &server = *server_states[server_idx];
+             LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server "
+                       <<server_states[server_idx]->server_id;
+
+             // TODO(Anh) may overwrite this field if #shards>#table_servers
+             server.shard_id = shard;
+             server.local_shards.insert(new TaskId(id, shard));
+             server_idx = (server_idx + 1) % server_states.size();
+           }
+
+         VLOG(3)<<"table assignment";
+         //  then send table assignment
+         ShardAssignmentRequest req;
+         for (size_t i = 0; i < server_states.size(); ++i) {
+           ServerState &server = *server_states[i];
+           for (auto * task: server.local_shards) {
+             ShardAssignment *s  = req.add_assign();
+             s->set_new_worker(server.server_id);
+             s->set_table(task->table);
+             s->set_shard(task->shard);
+             //  update local tables
+             CHECK(tables.find(task->table)!=tables.end());
+             GlobalTable *t = tables.at(task->table);
+             t->get_partition_info(task->shard)->owner = server.server_id;
+             delete task;
+           }
+         }
+         VLOG(3)<<"finish table assignment, req size "<<req.assign_size();
+         network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, 
MTYPE_SHARD_ASSIGNMENT_DONE, req);
+         VLOG(3)<<"finish table server init";
+}
+
+void worker_table_init(){
+       table_server = new TableServer();
+       table_server->StartTableServer(tables);
+       VLOG(3) << "done starting table server";
+}
+
+double random_double(){
+       return static_cast<double>(rand())/static_cast<double>(RAND_MAX);
+}
+
+// popular table with random large or small messages.
+// the message distribution specified in FLAGS_large_precentage
+void coordinator_load_data(){
+       auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]);
+
+       num_keys = 0;
+  int nservers=context->num_table_servers();
+       for (int i = 0; i < SIZE; i++) {
+               int total = 0;
+    int threshold=std::max(FLAGS_threshold,0l);//  sizes[i]/nservers);
+    while (total < sizes[i]) {
+      FloatVector* fv = new FloatVector();
+      for (int j = 0; j + total < sizes[i] && j < threshold; j++)
+        fv->add_data(
+            static_cast<float>(rand())
+            / static_cast<float>(RAND_MAX));
+      table->put(num_keys,*fv);
+      total += threshold;
+      num_keys++;
+    }
+       }
+       VLOG(3) << "Loaded data successfully ... " << num_keys << " messages";
+}
+
+void get(TypedGlobalTable<int,FloatVector>* table, ofstream &latency){
+       double start , end;
+  StateQueue<int> state(num_keys);
+  FloatVector v;
+  /*
+       for (int i=0; i<num_keys; i++){
+    start = Now();
+    table->get(i);
+    end=Now();
+    latency << "get: " << (end - start) << endl;
+  }
+  */
+  start=Now();
+       for (int i=0; i<num_keys; i++){
+    if(table->async_get(i, &v))
+      state.Invalid(i);
+       }
+  latency << "send get: " << (Now() - start) << endl;
+  start=Now();
+  while(state.HasValid()){
+    int key=state.Next();
+    if(table->async_get_collect(&key, &v))
+      state.Invalid(key);
+    sleep(0.001);
+  }
+  latency << "collect get: " << (Now() - start) << endl;
+}
+
+void update(TypedGlobalTable<int,FloatVector>* table, ofstream &latency){
+       double start, end;
+       for (int i=0; i<num_keys; i++){
+               start = Now();
+               table->update(i,*value_msg[i]);
+    end=Now();
+               latency << "update: " << (end - start) << endl;
+       }
+}
+
+void worker_test_data(){
+       init_messages();
+       auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]);
+
+       ofstream latency(StringPrintf("latency_%d",NetworkThread::Get()->id()));
+       ofstream throughput(StringPrintf("throughput_%d", 
NetworkThread::Get()->id()));
+       double start, end;
+       for (int i=0; i<FLAGS_iterations; i++){
+               start = Now();
+               get(table, latency);
+    end=Now();
+               throughput << "get: " << (end - start) << " over " << num_keys 
<< " ops " << endl;
+               start = Now();
+               update(table, latency);
+    end=Now();
+               throughput << "update: " << (end - start) << " over " << 
num_keys << " ops " << endl;
+    sleep(10);
+       }
+       latency.close();
+       throughput.close();
+
+}
+
+void print_table_stats(){
+       auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]);
+       ofstream log_file(StringPrintf("log_variance_%d", 
NetworkThread::Get()->id()));
+       log_file << "table size at process "<< NetworkThread::Get()->id()<<" = 
" << table->stats()["TABLE_SIZE"] << endl;
+       log_file.close();
+}
+
+void shutdown(){
+       if (context->AmICoordinator()){
+               VLOG(3) << "Coordinator is shutting down ...";
+               EmptyMessage msg;
+               for (int i=0; i<context->num_processes()-1; i++)
+                       network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg);
+                EmptyMessage shutdown_msg;
+                 for (int i = 0; i < network->size() - 1; i++) {
+                   network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg);
+                 }
+                 network->Flush();
+                 network->Shutdown();
+       }
+       else{
+               VLOG(3) << "Worker " << network->id() << " is shutting down 
...";
+         network->Flush();
+         VLOG(3) << "Done flushing the network thread";
+         network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END, 
EmptyMessage());
+         EmptyMessage msg;
+         network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, 
&msg);
+         VLOG(3) << "Worker received MTYPE_WORKER_SHUTDOWN";
+
+         table_server->ShutdownTableServer();
+         VLOG(3) << "Flushing node " << network->id();
+         network->Shutdown();
+       }
+}
+
+
+int main(int argc, char **argv) {
+       FLAGS_logtostderr = 1;
+       google::InitGoogleLogging(argv[0]);
+       gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+       context = GlobalContext::Get(FLAGS_system_conf, FLAGS_model_conf);
+       network = NetworkThread::Get();
+       VLOG(3) << "*** testing memory servers, with "
+                       << context->num_table_servers() << " servers";
+
+
+       create_mem_table(0,context->num_table_servers());
+
+  LOG(INFO)<<"threshold: "<<FLAGS_threshold<<" nworkers: "<<FLAGS_workers;
+       if (context->AmICoordinator()){
+               coordinator_assign_tables(0);
+               coordinator_load_data();
+               network->barrier();
+       }
+       else{
+               worker_table_init();
+               network->barrier();
+               VLOG(3) << "passed the barrier";
+               print_table_stats();
+
+               //Sleep(1);
+    if(network->id()<FLAGS_workers)
+      worker_test_data();
+       }
+
+       shutdown();
+       return 0;
+}
+
+

Reply via email to