add ClusterRunTime component definition
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/6b1e65ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/6b1e65ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/6b1e65ee Branch: refs/heads/master Commit: 6b1e65ee05ef55c7152256a71d696d09e6310b5b Parents: 679573a Author: wangsheng1001 <[email protected]> Authored: Mon May 25 14:27:40 2015 +0800 Committer: wangsheng1001 <[email protected]> Committed: Mon May 25 14:27:40 2015 +0800 ---------------------------------------------------------------------- Makefile.example | 91 ++++ include/utils/cluster.h | 15 + include/utils/cluster_rt.h | 59 +++ src/test/dist_test/test_consistency.cc | 406 ---------------- src/test/dist_test/test_core.cc | 192 -------- src/test/dist_test/test_da.cc | 700 --------------------------- src/test/dist_test/test_dary.cc | 85 ---- src/test/dist_test/test_disk_table.cc | 188 ------- src/test/dist_test/test_mnistlayer.cc | 165 ------- src/test/dist_test/test_model.cc | 25 - src/test/dist_test/test_neuralnet.cc | 141 ------ src/test/dist_test/test_pm.cc | 88 ---- src/test/dist_test/test_router.cc | 27 -- src/test/dist_test/test_split.cc | 304 ------------ src/test/dist_test/test_table_server.cc | 357 -------------- src/test/dist_test/test_tuple.cc | 258 ---------- src/test/model/test_blob.cc | 58 --- src/test/model/test_data_layer.cc | 178 ------- src/test/model/test_label_source.cc | 59 --- src/test/model/test_param.cc | 138 ------ src/test/model/test_proto.cc | 67 --- src/test/model/test_rgb_dir_source.cc | 63 --- src/test/test_cluster.cc | 10 +- src/test/test_communication.cc | 158 ------ src/test/test_shard.cc | 56 --- src/utils/cluster_rt.cc | 32 ++ 26 files changed, 205 insertions(+), 3715 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/Makefile.example ---------------------------------------------------------------------- diff --git a/Makefile.example b/Makefile.example new file mode 100644 index 0000000..80dfc26 --- /dev/null +++ b/Makefile.example @@ -0,0 +1,91 @@ +###################User Config Varaibles ############################# +# third-party library installation folder +HOME_DIR := /usr/ +# Lib folder for system and external libs. You may need to change it. +LIBRARY_DIRS := $(HOME_DIR)/lib64 $(HOME_DIR)/lib $(HOME_DIR)/local/lib +# Header folder for system and external libs. You may need to change it. +INCLUDE_DIRS := $(HOME_DIR)/include ./include +# g++ location, should support c++11, tested with 4.8.1 +CXX := g++ + +######################Setting Varialbes####################################### +LIBRARIES := glog gflags protobuf rt opencv_highgui opencv_imgproc opencv_core\ + lmdb openblas zmq czmq + +LDFLAGS := $(foreach librarydir, $(LIBRARY_DIRS), -L$(librarydir))\ + $(foreach library, $(LIBRARIES), -l$(library)) +# Folder to store compiled files +BUILD_DIR := build +MSHADOW_FLAGS :=-DMSHADOW_USE_CUDA=0 -DMSHADOW_USE_CBLAS=1 -DMSHADOW_USE_MKL=0 +CXXFLAGS := -O3 -Wall -pthread -fPIC -std=c++11 -Wno-unknown-pragmas \ + $(MSHADOW_FLAGS) -DCPU_ONLY=1 \ + -funroll-loops $(foreach includedir, $(INCLUDE_DIRS), -I$(includedir)) + +# find user defined .proto file, and then compute the corresponding .h, .cc +# files, which cannot be found by shell find, because they haven't been +# generated currently +PROTOS := $(shell find src/proto/ -name "*.proto") +PROTO_SRCS :=$(PROTOS:.proto=.pb.cc) +PROTO_HDRS :=$(patsubst src%, include%, $(PROTOS:.proto=.pb.h)) +PROTO_OBJS :=$(addprefix $(BUILD_DIR)/, $(PROTO_SRCS:.cc=.o)) + +# each singa src file will generate a .o file +SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" \) \ + -prune -o \( -name "*.cc" -type f \) -print ) +SINGA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(SINGA_SRCS:.cc=.o)) \ + $(PROTO_OBJS) ) +-include $(SINGA_OBJS:%.o=%.P) + +TEST_SRCS :=$(shell find src/test/ -maxdepth 1 -name "*.cc") +TEST_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(TEST_SRCS:.cc=.o))) +-include $(TEST_OBJS:%.o=%.P) + +GTEST_SRC := include/gtest/gtest-all.cc +GTEST_HDR := include/gtest/gtest.h +GTEST_LIB := $(BUILD_DIR)/libgtest.a + +OBJS := $(sort $(SINGA_OBJS) $(TEST_OBJS) ) + +########################Compilation Section################################### +.PHONY: singa test + +singa: $(PROTO_OBJS) $(SINGA_OBJS) + $(CXX) $(SINGA_OBJS) src/main.cc -o $(BUILD_DIR)/singa $(CXXFLAGS) $(LDFLAGS) + @echo + +loader: proto $(LOADER_OBJS) + $(CXX) $(LOADER_OBJS) -o $(BUILD_DIR)/loader $(CXXFLAGS) $(LDFLAGS) + @echo + +test: proto $(GTEST_LIB) $(TEST_OBJS) $(SINGA_OBJS) + $(CXX) $(TEST_OBJS) include/gtest/gtest_main.cc $(GTEST_LIB) \ + $(SINGA_OBJS) -o $(BUILD_DIR)/test $(CXXFLAGS) $(LDFLAGS) + @echo + +$(GTEST_LIB): $(GTEST_HDR) $(GTEST_SRC) + $(CXX) $(GTEST_SRC) -c -o $(BUILD_DIR)/gtest-all.o $(CXXFLAGS) + ar -rv $(GTEST_LIB) $(BUILD_DIR)/gtest-all.o + +# compile all files +$(OBJS):$(BUILD_DIR)/%.o : %.cc + @mkdir -p $(dir $@) + $(CXX) $< $(CXXFLAGS) -MMD -c -o $@ + cp $(BUILD_DIR)/$*.d $(BUILD_DIR)/$*.P; \ + sed -e 's/#.*//' -e 's/^[^:]*: *//' -e 's/ *\\$$//' \ + -e '/^$$/ d' -e 's/$$/ :/' < $(BUILD_DIR)/$*.d >> $(BUILD_DIR)/$*.P; \ + rm -f $*.d + +proto: $(PROTO_OBJS) + +$(PROTO_SRCS): $(PROTOS) + protoc --proto_path=src/proto --cpp_out=src/proto $(PROTOS) + mkdir -p include/proto/ + cp src/proto/*.pb.h include/proto/ + @echo + +clean: + rm -rf *.a *.so + rm -rf include/proto/* + rm -rf src/proto/*.pb.h src/proto/*.pb.cc + rm -rf $(BUILD_DIR) + @echo http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/include/utils/cluster.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster.h b/include/utils/cluster.h index 4812987..cd1ca76 100644 --- a/include/utils/cluster.h +++ b/include/utils/cluster.h @@ -6,6 +6,7 @@ #include <memory> #include <vector> #include "proto/cluster.pb.h" +#include "cluster_rt.h" using std::shared_ptr; using std::string; @@ -108,6 +109,19 @@ class Cluster { } */ + //ClusterRuntime functions + bool server_watch(int gid, int sid) const { + return false; + } + + bool worker_join_sgroup(int gid, int wid, int server_group) const { + return false; + } + + bool worker_leave_sgroup(int gid, int wid, int s_group) const { + return false; + } + private: Cluster(const ClusterProto &cluster, int procs_id) ; void SetupFolders(const ClusterProto &cluster); @@ -120,6 +134,7 @@ class Cluster { // make this class a singlton static shared_ptr<Cluster> instance_; }; + } // namespace singa #endif // INCLUDE_UTILS_CLUSTER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/include/utils/cluster_rt.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h new file mode 100644 index 0000000..d9587ce --- /dev/null +++ b/include/utils/cluster_rt.h @@ -0,0 +1,59 @@ +#ifndef INCLUDE_UTILS_CLUSTER_RT_H_ +#define INCLUDE_UTILS_CLUSTER_RT_H_ +#include <glog/logging.h> +#include <string> +#include <utility> + +using std::string; + +namespace singa { + +/** + * ClusterRuntime is a runtime service that manages dynamic configuration and status + * of the whole cluster. It mainly provides following services: + * 1) Provide running status of each server/worker + * 1) Translate process id to (hostname:port) + */ +class ClusterRuntime{ + public: + ClusterRuntime(){} + virtual ~ClusterRuntime(){} + + /** + * Initialize the runtime instance + */ + virtual bool Init(){return false;} + + /** + * Server: watch all workers in a server group, will be notified when all workers have left + */ + virtual bool sWatchSGroup(int gid, int sid){ return false;} + + /** + * Worker: join a server group (i.e. start to read/update these servers) + */ + virtual bool wJoinSGroup(int gid, int wid, int s_group){ return false;} + + /** + * Worker: leave a server group (i.e. finish its all work) + */ + virtual bool wLeaveSGroup(int gid, int wid, int s_group){ return false;} +}; + + +class ZKClusterRT : public ClusterRuntime{ + public: + ZKClusterRT(string host); + ~ZKClusterRT(); + bool Init(); + bool sWatchSGroup(int gid, int sid); + bool wJoinSGroup(int gid, int wid, int s_group); + bool wLeaveSGroup(int gid, int wid, int s_group); + + private: + string host_; +}; + +} // namespace singa + +#endif // INCLUDE_UTILS_CLUSTER_RT_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_consistency.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_consistency.cc b/src/test/dist_test/test_consistency.cc deleted file mode 100644 index a4ed9b2..0000000 --- a/src/test/dist_test/test_consistency.cc +++ /dev/null @@ -1,406 +0,0 @@ -// Copyright © 2014 Anh Dinh. All Rights Reserved. - -// Testing the unbalance in spliting parameter vectors. - -#include "core/global-table.h" -#include "core/common.h" -#include "core/disk-table.h" -#include "core/table.h" -#include "core/table_server.h" -#include "utils/global_context.h" -#include <gflags/gflags.h> -#include "proto/model.pb.h" -#include "proto/common.pb.h" -#include "worker.h" -#include "coordinator.h" -#include "utils/common.h" -#include "utils/proto_helper.h" - -#include <cmath> -#include <stdlib.h> -#include <vector> -#include <iostream> -#include <fstream> - - -DEFINE_bool(restore_mode, false, "restore from checkpoint file"); -using namespace lapis; -using std::vector; - -//DEFINE_bool(sync_update, false, "Synchronous put/update queue"); -DEFINE_int32(checkpoint_frequency, 5000, "frequency for cp"); -DEFINE_int32(checkpoint_after, 1, "cp after this steps"); -DEFINE_string(par_mode, "hybrid", "time training algorithm"); -DEFINE_bool(restore, false, "restore from checkpoint file"); - -DEFINE_string(db_backend, "lmdb", "backend db"); -DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration file for node roles"); -DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model configuration file"); -DEFINE_string(checkpoint_dir,"/data1/wangwei/lapis/","check point dir"); -DEFINE_int32(threshold,1000000, "max # of parameters in a vector"); -DEFINE_int32(iterations,5,"numer of get/put iterations"); -DEFINE_int32(workers,2,"numer of workers doing get/put"); -DECLARE_bool(checkpoint_enabled); - -#ifndef FLAGS_v - DEFINE_int32(v, 3, "vlog controller"); -#endif - - -struct AnhUpdateHandler: BaseUpdateHandler<VKey,SGDValue>{ - bool Update(SGDValue *a, const SGDValue &b){ - float * adptr=a->mutable_data()->mutable_value()->mutable_data(); - const float*bdptr=b.grad(0).value().data(); - for(int i=0;i<b.grad(0).value_size();i++) - adptr[i]+=bdptr[i]; - return true; - } - - bool Get(const VKey k, const SGDValue &val, SGDValue *ret){ - *ret = val; - return true; - } - - bool is_checkpointable(const VKey k, const SGDValue v){ - return true; //always checkpoint - } -}; - -typedef map<int, GlobalTable*> Map; -Map tables; -shared_ptr<NetworkThread> network; -shared_ptr<GlobalContext> context; -std::vector<ServerState*> server_states; -TableServer *table_server; -TableDelegate *delegate; -void create_mem_table(int id, int num_shards){ - - TableDescriptor *info = new TableDescriptor(id, num_shards); - info->key_marshal = new Marshal<VKey>(); - info->value_marshal = new Marshal<SGDValue>(); - info->sharder = new VKeySharder; - info->accum = new AnhUpdateHandler; - info->partition_factory = new typename SparseTable<VKey, SGDValue>::Factory; - auto table=new TypedGlobalTable<VKey, SGDValue>(); - table->Init(info); - tables[id] = table; -} - -void coordinator_assign_tables(int id){ - for (int i = 0; i < context->num_procs() ; ++i) { - RegisterWorkerRequest req; - int src = 0; - // adding memory server. - if (context->IsTableServer(i)) { - network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src); - server_states.push_back(new ServerState(i)); - } - } - LOG(INFO) << " All servers registered and started up. Ready to go"; - // set itself as the current worker for the table - tables[id]->worker_id_ = network->id(); - - // memory servers are specified in global context. Round-robin assignment - - VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< id; - - int server_idx = 0; - for (int shard = 0; shard < tables[id]->num_shards(); ++shard) { - ServerState &server = *server_states[server_idx]; - LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server " - <<server_states[server_idx]->server_id; - - // TODO(Anh) may overwrite this field if #shards>#table_servers - server.shard_id = shard; - server.local_shards.insert(new TaskId(id, shard)); - server_idx = (server_idx + 1) % server_states.size(); - } - - VLOG(3)<<"table assignment"; - // then send table assignment - ShardAssignmentRequest req; - for (size_t i = 0; i < server_states.size(); ++i) { - ServerState &server = *server_states[i]; - for (auto * task: server.local_shards) { - ShardAssignment *s = req.add_assign(); - s->set_new_worker(server.server_id); - s->set_table(task->table); - s->set_shard(task->shard); - // update local tables - CHECK(tables.find(task->table)!=tables.end()); - GlobalTable *t = tables.at(task->table); - t->get_partition_info(task->shard)->owner = server.server_id; - delete task; - } - } - VLOG(3)<<"finish table assignment, req size "<<req.assign_size(); - network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, MTYPE_SHARD_ASSIGNMENT_DONE, req); - VLOG(3)<<"finish table server init"; -} - - -void worker_table_init(){ - table_server = new TableServer(); - table_server->StartTableServer(tables); - VLOG(3) << "done starting table server"; -} - -double random_double(){ - return static_cast<double>(rand())/static_cast<double>(RAND_MAX); -} - -// popular table with random large or small messages. -// the message distribution specified in FLAGS_large_precentage -void coordinator_load_data(const vector<int>& tuples){ - auto table = static_cast<TypedGlobalTable<VKey,SGDValue>*>(tables[0]); - - int nservers=context->num_table_servers(); - int keyid=0; - if (!FLAGS_restore_mode){ - for(auto tuple: tuples){ - for(int offset=0;offset<tuple;){ - SGDValue x; - DAryProto *data=x.mutable_data(); - DAryProto *grad=x.add_grad(); - for(int i=0;i <std::min(FLAGS_threshold, tuple-offset);i++){ - data->add_value(i*1.0f); - grad->add_value(i*1.0f); - } - offset+=data->value_size(); - VKey key; - key.set_key(keyid++); - table->put(key,x); - } - } - LOG(ERROR)<<"put "<<keyid<<" tuples"; - } - - /* - LogFile *file = new LogFile("/data1/wangwei/lapis/checkpoint_0","rw",0); - VLOG(3) << "Loaded table " << file->file_name(); - string k,v; - int table_size = file->read_latest_table_size(); - VLOG(3) << "table size = " << table_size; - for (int i=0; i<table_size; i++){ - int tmp; - file->previous_entry(&k, &v, &tmp); - int *key = reinterpret_cast<int *>((char*)&k[0]); - int *val = reinterpret_cast<int *>((char*)&v[0]); - VLOG(3) << "k = " << *key << " val = " << *val; - } - delete file; - */ - - /* - for (int i=0; i<num_keys; i++){ - table->put(i,0); //loaded again - }*/ - VLOG(3) << "Coordinator done loading ..., from process "<<NetworkThread::Get()->id(); -} - -void get(TypedGlobalTable<VKey,SGDValue>* table, const vector<int>& tuples){ - SGDValue v; - int num_keys=0; - for(auto tuple: tuples){ - num_keys+=tuple/FLAGS_threshold+(tuple%FLAGS_threshold!=0); - } - LOG(ERROR)<<"getting "<<num_keys<<" tuples"; - - for (int i=0; i<num_keys; i++){ - VKey key; - key.set_key(i); - table->async_get(key, &v); - } - - - int key=0; - SGDValue val; - - LOG(INFO)<<"start collect key"; - for (int i=0; i<num_keys; i++){ - VKey key; - while(!table->async_get_collect(&key, &val)) - Sleep(0.001); - //LOG(INFO)<<"collect key "<<key<<" with val "<<val; - } -} - -void update(TypedGlobalTable<VKey,SGDValue>* table, const vector<int>& tuples){ - if(NetworkThread::Get()->id()==0) - sleep(2); - LOG(INFO)<<"start update"; - int keyid=0; - for(auto tuple: tuples){ - for(int offset=0;offset<tuple;){ - SGDValue x; - DAryProto *grad=x.add_grad(); - for(int i=0;i <std::min(FLAGS_threshold, tuple-offset);i++){ - grad->add_value(i*1.0f); - } - offset+=grad->value_size(); - VKey key; - key.set_key(keyid++); - table->update(key,x); - } - } - LOG(ERROR)<<"updated "<<keyid<<" tuples"; -} - -void worker_test_data(const vector<int>& tuples){ - auto table = static_cast<TypedGlobalTable<VKey,SGDValue>*>(tables[0]); - - get(table, tuples); - update(table, tuples); - update(table, tuples); - update(table, tuples); - get(table, tuples); -} - -void shutdown(){ - if (context->AmICoordinator()){ - EmptyMessage msg; - for (int i=0; i<context->num_procs()-1; i++) - network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg); - EmptyMessage shutdown_msg; - for (int i = 0; i < network->size() - 1; i++) { - network->Send(i, MTYPE_SHUTDOWN, shutdown_msg); - } - network->Flush(); - network->Shutdown(); - } - else{ - network->Flush(); - - network->Send(context->num_procs()-1, MTYPE_WORKER_END, EmptyMessage()); - - EmptyMessage msg; - - network->Read(context->num_procs()-1, MTYPE_SHUTDOWN, &msg); - - if (context->AmITableServer()) - table_server->ShutdownTableServer(); - - network->Shutdown(); - } -} - -void HandleShardAssignment() { - - ShardAssignmentRequest shard_req; - auto mpi=NetworkThread::Get(); - mpi->Read(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT, &shard_req); - // request read from coordinator - for (int i = 0; i < shard_req.assign_size(); i++) { - const ShardAssignment &a = shard_req.assign(i); - GlobalTable *t = tables.at(a.table()); - t->get_partition_info(a.shard())->owner = a.new_worker(); - - - //if local shard, create check-point files - if (FLAGS_checkpoint_enabled && t->is_local_shard(a.shard())){ - string checkpoint_file = StringPrintf("%s/checkpoint_%d",FLAGS_checkpoint_dir.c_str(), a.shard()); - char hostname[256]; - gethostname(hostname, sizeof(hostname)); - VLOG(3) << "try to open for writing *****"<<checkpoint_file<<" "<<string(hostname); - - FILE *tmp_file = fopen(checkpoint_file.c_str(), "r"); - if (tmp_file){//exists -> open to reading and writing - fclose(tmp_file); - auto cp = t->checkpoint_files(); - - if (FLAGS_restore_mode){//open in read mode to restore, then close - LogFile *file = new LogFile(checkpoint_file,"rw",0); - VLOG(3) << "Loaded table " << file->file_name(); - int table_size = file->read_latest_table_size(); - delete file; - - double start=Now(); - VLOG(3) << "Open checkpoint file to restore"; - (*cp)[a.shard()] = new LogFile(checkpoint_file,"r",a.shard()); - t->Restore(a.shard()); - delete (*cp)[a.shard()]; - double end=Now(); - LOG(ERROR)<<"restore time\t"<<end-start<< "\tfor\t" - <<table_size<<"\tthreshold\t"<<FLAGS_threshold; - } - char hostname[256]; - gethostname(hostname, sizeof(hostname)); - VLOG(3) << "open for writing *****"<<checkpoint_file<<" "<<string(hostname); - - - - VLOG(3) << "Open checkpoint file for writing"; - (*cp)[a.shard()] = new LogFile(checkpoint_file,"a",a.shard()); - } - else{// not exist -> open to writing first time - auto cp = t->checkpoint_files(); - (*cp)[a.shard()] = new LogFile(checkpoint_file,"w",a.shard()); - VLOG(3) << "Added to new checkpoint files for shard "<< a.shard(); - } - - } - - - } - EmptyMessage empty; - mpi->Send(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT_DONE, empty); - VLOG(3)<<"finish handle shard assignment **"; - -} - - -int main(int argc, char **argv) { - FLAGS_logtostderr = 1; - int provided; - MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - - context = GlobalContext::Get(FLAGS_system_conf); - network = NetworkThread::Get(); - - ModelProto model; - ReadProtoFromTextFile(FLAGS_model_conf.c_str(), &model); - - create_mem_table(0,context->num_table_servers()); - - vector<int> tuple_size{37448736, 16777216, 4096000, 1327104, 884736, 884736, 614400,14112,4096,4096,1000,384,384,256,256,96}; - /* - vector<int> tuples; - for(int i=0;i<3;i++){ - for(int j=0;j<FLAGS_workers;j++) - tuples.push_back(tuple_size[i]/FLAGS_workers); - } - for(int i=3;i<tuple_size.size();i++) - tuples.push_back(tuple_size[i]); - */ - - if (context->AmICoordinator()){ - VLOG(3) << "Coordinator process rank = " << NetworkThread::Get()->id(); - coordinator_assign_tables(0); - coordinator_load_data(tuple_size); - - network->barrier(); - } - else{ - if (context->AmITableServer()){ - worker_table_init(); - HandleShardAssignment(); - network->barrier(); - } - else{ - VLOG(3) << "Inside worker, waiting for assignemtn"; - HandleShardAssignment(); - network->barrier(); - if(!FLAGS_restore_mode) - worker_test_data(tuple_size); - } - } - shutdown(); - - - VLOG(3) << "Done ..."; - return 0; -} - - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_core.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_core.cc b/src/test/dist_test/test_core.cc deleted file mode 100644 index 35d589b..0000000 --- a/src/test/dist_test/test_core.cc +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright © 2014 Anh Dinh. All Rights Reserved. - - - -#include "core/global-table.h" -#include "core/common.h" -#include "core/disk-table.h" -#include "core/table.h" -#include "core/table_server.h" -#include "utils/global_context.h" -#include <gflags/gflags.h> -#include "proto/model.pb.h" -#include "worker.h" -#include "coordinator.h" -#include "model_controller/myacc.h" -#include <cmath> - -using namespace lapis; - -DEFINE_bool(sync_update, false, "Synchronous put/update queue"); -DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration file for node roles"); -DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model configuration file"); -DEFINE_int32(num_keys,10,""); - -typedef map<int, GlobalTable*> Map; -Map tables; -shared_ptr<NetworkThread> network; -shared_ptr<GlobalContext> context; -std::vector<ServerState*> server_states; -TableServer *table_server; - -void create_mem_table(int id, int num_shards){ - - TableDescriptor *info = new TableDescriptor(id, num_shards); - info->key_marshal = new Marshal<int>(); - info->value_marshal = new Marshal<int>(); - info->sharder = new Sharding::Mod; - info->accum = new TestUpdater(); - info->partition_factory = new typename SparseTable<int, int>::Factory; - auto table=new TypedGlobalTable<int, int>(); - table->Init(info); - tables[id] = table; -} - -void coordinator_assign_tables(int id){ - for (int i = 0; i < context->num_processes()-1; ++i) { - RegisterWorkerRequest req; - int src = 0; - network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src); - // adding memory server. - if (context->IsTableServer(i)) { - server_states.push_back(new ServerState(i)); - } - } - LOG(INFO) << " All servers registered and started up. Ready to go"; - // set itself as the current worker for the table - tables[id]->worker_id_ = network->id(); - - // memory servers are specified in global context. Round-robin assignment - - VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< id; - - int server_idx = 0; - for (int shard = 0; shard < tables[id]->num_shards(); ++shard) { - ServerState &server = *server_states[server_idx]; - LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server " - <<server_states[server_idx]->server_id; - - // TODO(Anh) may overwrite this field if #shards>#table_servers - server.shard_id = shard; - server.local_shards.insert(new TaskId(id, shard)); - server_idx = (server_idx + 1) % server_states.size(); - } - - VLOG(3)<<"table assignment"; - // then send table assignment - ShardAssignmentRequest req; - for (size_t i = 0; i < server_states.size(); ++i) { - ServerState &server = *server_states[i]; - for (auto * task: server.local_shards) { - ShardAssignment *s = req.add_assign(); - s->set_new_worker(server.server_id); - s->set_table(task->table); - s->set_shard(task->shard); - // update local tables - CHECK(tables.find(task->table)!=tables.end()); - GlobalTable *t = tables.at(task->table); - t->get_partition_info(task->shard)->owner = server.server_id; - delete task; - } - } - VLOG(3)<<"finish table assignment, req size "<<req.assign_size(); - network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, MTYPE_SHARD_ASSIGNMENT_DONE, req); - VLOG(3)<<"finish table server init"; -} - -void worker_table_init(){ - table_server = new TableServer(); - table_server->StartTableServer(tables); - VLOG(3) << "done starting table server"; -} - - -void coordinator_load_data(){ - auto table = static_cast<TypedGlobalTable<int,int>*>(tables[0]); - for (int i = 1; i<=FLAGS_num_keys; i++){ - table->put(i,i); - } - VLOG(3) << "Loaded data successfully ..."; -} - -void worker_test_data(){ - auto table = static_cast<TypedGlobalTable<int,int>*>(tables[0]); - for (int i=1; i<=FLAGS_num_keys; i++) - VLOG(3) << StringPrintf("Worker %d got (%d,%d)", NetworkThread::Get()->id(), i, table->get(i)); - - - for (int j = 0; j < 2; j++) { - for (int i = 1; i <= FLAGS_num_keys; i++) - table->update(i, i); - - for (int i = 1; i <= FLAGS_num_keys; i++) - VLOG(3) - << StringPrintf("Worker %d got (%d,%d)", - NetworkThread::Get()->id(), i, table->get(i)); - } -/* - for (int i = 1; i <= FLAGS_num_keys; i++) - VLOG(3) - << StringPrintf("Worker %d got (%d,%d)", - - NetworkThread::Get()->id(), i, table->get(i)); -*/ -} - -void shutdown(){ - if (context->AmICoordinator()){ - VLOG(3) << "Coordinator is shutting down ..."; - EmptyMessage msg; - for (int i=0; i<context->num_processes()-1; i++) - network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg); - EmptyMessage shutdown_msg; - for (int i = 0; i < network->size() - 1; i++) { - network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg); - } - network->Flush(); - network->Shutdown(); - } - else{ - VLOG(3) << "Worker " << network->id() << " is shutting down ..."; - network->Flush(); - VLOG(3) << "Done flushing the network thread"; - network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END, EmptyMessage()); - EmptyMessage msg; - network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, &msg); - VLOG(3) << "Worker received MTYPE_WORKER_SHUTDOWN"; - table_server->ShutdownTableServer(); - VLOG(3) << "Flushing node " << network->id(); - network->Shutdown(); - } -} - - -int main(int argc, char **argv) { - FLAGS_logtostderr = 1; - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - - context = GlobalContext::Get(FLAGS_system_conf, FLAGS_model_conf); - network = NetworkThread::Get(); - VLOG(3) << "*** testing memory servers, with " - << context->num_table_servers() << " servers"; - create_mem_table(0,context->num_table_servers()); - - if (context->AmICoordinator()){ - coordinator_assign_tables(0); - coordinator_load_data(); - network->barrier(); - } - else{ - worker_table_init(); - network->barrier(); - VLOG(3) << "passed the barrier"; - //Sleep(1); - worker_test_data(); - } - - shutdown(); - return 0; -} - - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_da.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_da.cc b/src/test/dist_test/test_da.cc deleted file mode 100644 index 51aa93e..0000000 --- a/src/test/dist_test/test_da.cc +++ /dev/null @@ -1,700 +0,0 @@ -#include <glog/logging.h> -#include <mpi.h> -#include <utility> -#include <vector> - -#include "da/gary.h" -#include "da/dary.h" -#include "da/ary.h" - - -using std::make_pair; -using std::vector; -void Debug() { - int i = 0; - char hostname[256]; - gethostname(hostname, sizeof(hostname)); - printf("PID %d on %s ready for attach\n", getpid(), hostname); - fflush(stdout); - while (0 == i) - sleep(5); -} - - - -void TestPar(int pdim, int rank){ - lapis::DAry a1, a2; - lapis::DAry a3, a4; - vector<lapis::Range> slice{make_pair(0,4), make_pair(0,8)}; - a1.SetShape({4,8}); - a2.SetShape({4,8}); - a1.Setup(pdim); - a2.Setup(pdim); - a1.Random(); - a2.Random(); - ARMCI_Barrier(); - - - if(rank==0){ - //Debug(); - LOG(ERROR)<<"test simple partition along "<< pdim<<" dim"; - a3=a1.Fetch(slice); - a4=a2.Fetch(slice); - LOG(ERROR)<<"fetch a"; - LOG(ERROR)<<a3.ToString(); - LOG(ERROR)<<"fetch b"; - LOG(ERROR)<<a4.ToString(); - a3.Add(a4); - LOG(ERROR)<<"a<- a+b"; - LOG(ERROR)<<a3.ToString(); - } - ARMCI_Barrier(); - a1.Add(a2); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a1.Fetch(slice); - LOG(ERROR)<<"add then fetch"; - LOG(ERROR)<<a5.ToString(); - } -} - - - -void TestMixedParElt(int pa, int pb, int pc, int rank){ - LOG(ERROR)<<" p dim for a,b,c is "<<pa<<" "<<pb<<" "<<pc; - vector<lapis::Range> slice{make_pair(0,3),make_pair(0,6), make_pair(0,2)}; - lapis::DAry a1, a2, a3; - a1.SetShape({3,6,2}); - a2.SetShape({3,6,2}); - a3.SetShape({3,6,2}); - a1.Setup(pa); - a2.Setup(pb); - a3.Setup(pc); - a1.Random(); - a2.Random(); - a3.Random(); - - ARMCI_Barrier(); - if(rank==0){ - LOG(ERROR)<<"test elementwise ops with mixed partition"; - lapis::DAry a5, a4; -// Debug(); - a5=a1.Fetch(slice); - a4=a2.Fetch(slice); - LOG(ERROR)<<"fetch a"; - LOG(ERROR)<<a5.ToString(); - LOG(ERROR)<<"fetch b"; - LOG(ERROR)<<a4.ToString(); - a5.Copy(a4); - LOG(ERROR)<<"fetch op a.Copy(b)"; - LOG(ERROR)<<a5.ToString(); - } - ARMCI_Barrier(); - a1.Copy(a2); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a1.Fetch(slice); - LOG(ERROR)<<"op fetch a.Copy(b)"; - LOG(ERROR)<<a5.ToString(); - } - -////////////////////////////////////////////////// - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a8, a4, a5({3,6,2}); - //Debug(); - a8=a1.Fetch(slice); - a4=a2.Fetch(slice); - LOG(ERROR)<<"fetch a"; - LOG(ERROR)<<a8.ToString(); - LOG(ERROR)<<"fetch b"; - LOG(ERROR)<<a4.ToString(); - a5.Mult(a8,a4); - LOG(ERROR)<<"fetch op c.mult(a,b)"; - LOG(ERROR)<<a5.ToString(); - } - ARMCI_Barrier(); - a3.Mult(a1,a2); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch a.Mult(b,c)"; - LOG(ERROR)<<a5.ToString(); - } -////////////////////////////////////////////////// - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a8, a4, a5({3,6,2}); - //Debug(); - a8=a1.Fetch(slice); - a4=a2.Fetch(slice); - LOG(ERROR)<<"fetch a"; - LOG(ERROR)<<a8.ToString(); - LOG(ERROR)<<"fetch b"; - LOG(ERROR)<<a4.ToString(); - a5.Div(a8,a4); - LOG(ERROR)<<"fetch op c.div(a,b)"; - LOG(ERROR)<<a5.ToString(); - } - ARMCI_Barrier(); - a3.Div(a1,a2); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch a.div(b,c)"; - LOG(ERROR)<<a5.ToString(); - } -////////////////////////////////////////////////// - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a8, a4, a5({3,6,2}); - //Debug(); - a8=a1.Fetch(slice); - LOG(ERROR)<<"fetch a"; - LOG(ERROR)<<a8.ToString(); - a5.Mult(a8, 3.0); - LOG(ERROR)<<"fetch op c.mult(a,3)"; - LOG(ERROR)<<a5.ToString(); - } - ARMCI_Barrier(); - a3.Mult(a1,3.0); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch a.mult(b,3)"; - LOG(ERROR)<<a5.ToString(); - } - -////////////////////////////////////////////////// - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a8, a4, a5({3,6,2}); - //Debug(); - a8=a1.Fetch(slice); - LOG(ERROR)<<"fetch a"; - LOG(ERROR)<<a8.ToString(); - a5.Square(a8); - LOG(ERROR)<<"fetch op c.square(a)"; - LOG(ERROR)<<a5.ToString(); - } - ARMCI_Barrier(); - a3.Square(a1); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch a.sqaure(b)"; - LOG(ERROR)<<a5.ToString(); - } - - -////////////////////////////////////////////////// - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a8, a4, a5({3,6,2}); - //Debug(); - a8=a1.Fetch(slice); - LOG(ERROR)<<"fetch a"; - LOG(ERROR)<<a8.ToString(); - a5.Pow(a8,3.0); - LOG(ERROR)<<"fetch op c.pow(a, 3)"; - LOG(ERROR)<<a5.ToString(); - } - ARMCI_Barrier(); - a3.Pow(a1,3.0); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch a.pow(b,3)"; - LOG(ERROR)<<a5.ToString(); - } - - -////////////////////////////////////////////////// - ARMCI_Barrier(); - a3.SampleUniform(0.0,3.0); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch a.uniform(0,3)"; - LOG(ERROR)<<a5.ToString(); - } -////////////////////////////////////////////////// - ARMCI_Barrier(); - a3.SampleGaussian(0.0,1.0); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch a.norm(0,1)"; - LOG(ERROR)<<a5.ToString(); - } - -////////////////////////////////////////////////// - ARMCI_Barrier(); - a3.Fill(1.43); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch a.fill(1.43)"; - LOG(ERROR)<<a5.ToString(); - } - - -////////////////////////////////////////////////// - ARMCI_Barrier(); - a1.Random(); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a8, a4, a5({3,6,2}); - a4=a1.Fetch(slice); - a5.Threshold(a4,0.3); - LOG(ERROR)<<"fetch op b=threshold(a,0.3)"; - LOG(ERROR)<<a4.ToString(); - LOG(ERROR)<<a5.ToString(); - } - - ARMCI_Barrier(); - a3.Threshold(a1, .30f); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch b=threshold(a,0.3)"; - LOG(ERROR)<<a5.ToString(); - } - -////////////////////////////////////////////////// - ARMCI_Barrier(); - a1.Random(); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a8, a4, a5({3,6,2}); - a4=a1.Fetch(slice); - a5.Max(a4,0.3); - LOG(ERROR)<<"fetch op b=max(a,0.3)"; - LOG(ERROR)<<a4.ToString(); - LOG(ERROR)<<a5.ToString(); - } - - ARMCI_Barrier(); - a3.Max(a1, .30f); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch b=max(a,0.3)"; - LOG(ERROR)<<a5.ToString(); - } - - -////////////////////////////////////////////////// - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry a6, a4, a5({3,6,2}); - a6=a1.Fetch(slice); - a4=a2.Fetch(slice); - a5.Map([](float a, float b) {return a+2*b;}, a6,a4); - LOG(ERROR)<<"fetch op b=map(a+2b)"; - LOG(ERROR)<<a6.ToString(); - LOG(ERROR)<<a4.ToString(); - LOG(ERROR)<<a5.ToString(); - } - ARMCI_Barrier(); - a3.Map([](float a, float b) {return a+2*b;}, a1,a2); - if(rank==0){ - lapis::DAry a5; - a5=a3.Fetch(slice); - LOG(ERROR)<<"op fetch b=map(a+2b)"; - LOG(ERROR)<<a5.ToString(); - } - LOG(ERROR)<<"finish elementwise ops"; -} - - -void TestLargeDot(int pa, int pb, int pc, int rank){ - if(rank==0){ - LOG(ERROR)<<"test Dot, partition for a, b, c : " - << pa<<" "<<pb<<" "<<pc<<" dim"; - } - - double t1, t2, t3; - t1=MPI_Wtime(); - lapis::DAry a,b,c; - a.SetShape({256,9216}); - b.SetShape({9216,4096}); - c.SetShape({256,4096}); - a.Setup(pa); - b.Setup(pb); - c.Setup(pc); - a.Random(); - b.Random(); - c.Random(); - ARMCI_Barrier(); - t2=MPI_Wtime(); - c.Dot(a,b); - t3=MPI_Wtime(); - ARMCI_Barrier(); - LOG(ERROR)<<"setup time: "<<t2-t1<<" dot time: " - <<t3-t2<<" wait time:"<<MPI_Wtime()-t3; -} - -void TestDot(int pa, int pb, int pc, int rank){ - vector<lapis::Range> slicea{make_pair(0,4), make_pair(0,8)}; - vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)}; - vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)}; - lapis::DAry a,b,c; - a.SetShape({4,8}); - b.SetShape({8,4}); - c.SetShape({4,4}); - a.Setup(pa); - b.Setup(pb); - c.Setup(pc); - a.Random(); - b.Random(); - c.Random(); - ////////////////////// - ARMCI_Barrier(); - if(rank==0){ - LOG(ERROR)<<"test Dot, partition for a, b, c : " - << pa<<" "<<pb<<" "<<pc<<" dim"; - LOG(ERROR)<<"c=a*b"; - lapis::DAry x,y,z; - x=a.Fetch(slicea); - y=b.Fetch(sliceb); - z=c.Fetch(slicec); - z.Dot(x,y); - LOG(ERROR)<<"fetch dot "; - LOG(ERROR)<<z.ToString(); - } - ARMCI_Barrier(); - //Debug(); - c.Dot(a,b); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry z; - z=c.Fetch(slicec); - LOG(ERROR)<<"dot fetch"; - LOG(ERROR)<<z.ToString(); - } - ///////////////////////////// - ARMCI_Barrier(); - - if(rank==0){ - LOG(ERROR)<<"a=c*b^T"; - lapis::DAry x,y,z; - x=a.Fetch(slicea); - y=b.Fetch(sliceb); - z=c.Fetch(slicec); - x.Dot(z,y, false, true); - LOG(ERROR)<<"fetch dot "; - LOG(ERROR)<<x.ToString(); - } - ARMCI_Barrier(); - //Debug(); - a.Dot(c,b, false, true); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry z; - z=a.Fetch(slicea); - LOG(ERROR)<<"dot fetch"; - LOG(ERROR)<<z.ToString(); - } - - ///////////////////////////// - ARMCI_Barrier(); - if(rank==0){ - LOG(ERROR)<<"b=a^T*c"; - lapis::DAry x,y,z; - x=a.Fetch(slicea); - y=b.Fetch(sliceb); - z=c.Fetch(slicec); - y.Dot(x,z, true, false); - LOG(ERROR)<<"fetch dot "; - LOG(ERROR)<<y.ToString(); - } - ARMCI_Barrier(); - //Debug(); - b.Dot(a,c, true, false); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry z; - z=b.Fetch(sliceb); - LOG(ERROR)<<"dot fetch"; - LOG(ERROR)<<z.ToString(); - } - ARMCI_Barrier(); - ///////////////////////////// - ARMCI_Barrier(); - if(rank==0){ - LOG(ERROR)<<"b=a^T*c^T"; - lapis::DAry x,y,z; - x=a.Fetch(slicea); - y=b.Fetch(sliceb); - z=c.Fetch(slicec); - y.Dot(x,z, true, true); - LOG(ERROR)<<"fetch dot "; - LOG(ERROR)<<y.ToString(); - } - ARMCI_Barrier(); - //Debug(); - b.Dot(a,c, true, true); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry z; - z=b.Fetch(sliceb); - LOG(ERROR)<<"dot fetch"; - LOG(ERROR)<<z.ToString(); - } - ARMCI_Barrier(); -} - - -void TestSubarray(int pa, int pb, int pc, int rank){ - vector<lapis::Range> slicea{make_pair(0,4), make_pair(0,8)}; - vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)}; - vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)}; - vector<lapis::Range> slice{make_pair(0,4)}; - lapis::DAry a,b,c; - a.SetShape({4}); - b.SetShape({8,4}); - c.SetShape({4,4}); - a.Setup(pa); - b.Setup(pb); - c.Setup(pc); - b.Random(); - c.Random(); - - //Debug(); - lapis::DAry sb=b[2]; - lapis::DAry sc=c[3]; - - ARMCI_Barrier(); - if(rank==0){ - LOG(ERROR)<<"test subary, partition for a, b, c : " - << pa<<" "<<pb<<" "<<pc<<" dim"; - lapis::DAry y,z, x({4}); - LOG(ERROR)<<"fetch full b, c"; - y=b.Fetch(sliceb); - z=c.Fetch(slicec); - LOG(ERROR)<<y.ToString(); - LOG(ERROR)<<z.ToString(); - LOG(ERROR)<<"fetch sub, sb[2], sc[3]"; - y=sb.Fetch(slice); - z=sc.Fetch(slice); - LOG(ERROR)<<y.ToString(); - LOG(ERROR)<<z.ToString(); - } - ARMCI_Barrier(); - a.Add(sb,sc); - ARMCI_Barrier(); - //Debug(); - if(rank==0){ - lapis::DAry z; - z=a.Fetch(slice); - LOG(ERROR)<<"sub add fetch, sb[2]+sc[3]"; - LOG(ERROR)<<z.ToString(); - } -} - -void TestReshape(int pa, int pb, int pc, int rank){ - vector<lapis::Range> sliceb3{make_pair(0,2),make_pair(0,4), make_pair(0,4)}; - vector<lapis::Range> sliceb{make_pair(0,8), make_pair(0,4)}; - vector<lapis::Range> slicec{make_pair(0,4), make_pair(0,4)}; - vector<lapis::Range> slicea{make_pair(0,4)}; - lapis::DAry a,b,c,b3,b2,b1; - a.SetShape({4}); - b.SetShape({8,4}); - c.SetShape({4,4}); - a.Setup(pa); - b.Setup(pb); - c.Setup(pc); - b.Random(); - c.Random(); - - b3=b.Reshape({2,4,4}); - //Debug() ; - b2=b3[1]; - if(rank==0){ - LOG(ERROR)<<"test reshape+subary, partition for a, b, c : " - << pa<<" "<<pb<<" "<<pc<<" dim"; - lapis::DAry y,z,x; - LOG(ERROR)<<"fetch b, b2, c"; - y=b.Fetch(sliceb); - z=b2.Fetch(slicec); - x=c.Fetch(slicec); - LOG(ERROR)<<y.ToString(); - LOG(ERROR)<<z.ToString(); - LOG(ERROR)<<x.ToString(); - LOG(ERROR)<<"fetch sub, b2+c"; - z.Add(x); - LOG(ERROR)<<z.ToString(); - } - - ARMCI_Barrier(); - c.Add(b2); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry y,z,x; - x=c.Fetch(slicec); - LOG(ERROR)<<"sub add,fetch c+b2"; - LOG(ERROR)<<x.ToString(); - } - ARMCI_Barrier(); - b2.Add(c); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry y,z,x; - x=b2.Fetch(slicec); - LOG(ERROR)<<"sub add,fetch b2+c"; - LOG(ERROR)<<x.ToString(); - } - ARMCI_Barrier(); - b1=b2[2]; - if(rank==0){ - lapis::DAry y,z,x; - x=b1.Fetch(slicea); - LOG(ERROR)<<"fetch b1"; - LOG(ERROR)<<x.ToString(); - } - - a.Add(b1); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry y,z,x; - x=a.Fetch(slicea); - LOG(ERROR)<<"add fetch a+b1"; - LOG(ERROR)<<x.ToString(); - } - ARMCI_Barrier(); - b1.Add(a); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry y,z,x; - x=b1.Fetch(slicea); - LOG(ERROR)<<"add fetch b1+a"; - LOG(ERROR)<<x.ToString(); - } - - ARMCI_Barrier(); - { - lapis::DAry b3=b.Reshape({4,2,4}); - lapis::DAry a; - a.SetShape({2,4}); - a.Setup(pa); - a.Random(); - lapis::DAry b1=b3[1]; - lapis::DAry b2=b3[3]; - lapis::DAry c; - c.SetShape({2,2}); - c.Setup(pc); - ARMCI_Barrier(); - c.Dot(a,b2,false, true); - ARMCI_Barrier(); - if(rank==0){ - lapis::DAry x,y,z,zz({2,2}); - y=b3.Fetch({make_pair(0,4), make_pair(0,2), make_pair(0,4)}); - x=a.Fetch({make_pair(0,2), make_pair(0,4)}); - LOG(ERROR)<<"fetch b,a"; - LOG(ERROR)<<y.ToString(); - LOG(ERROR)<<x.ToString(); - z=y[3]; - zz.Dot(x,z,false, true); - LOG(ERROR)<<"fetch dot c=a*b[3]^T"; - LOG(ERROR)<<zz.ToString(); - - x=a.Fetch({make_pair(0,2), make_pair(0,4)}); - y=b2.Fetch({make_pair(0,2), make_pair(0,4)}); - z=c.Fetch({make_pair(0,2), make_pair(0,2)}); - LOG(ERROR)<<"op fetch c=a*b[3]^T"; - LOG(ERROR)<<x.ToString(); - LOG(ERROR)<<y.ToString(); - LOG(ERROR)<<z.ToString(); - - } - ARMCI_Barrier(); - } -} - - - -int main(int argc, char**argv){ - // MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); - MPI_Init(&argc, &argv); - int rank, nprocs; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &nprocs); - vector<int> procs; - for (int i = 0; i < nprocs; i++) { - procs.push_back(i); - } - //Debug(); - lapis::GAry::Init(rank,procs); - google::InitGoogleLogging(argv[0]); - /* - if(nprocs%3==0){ - TestMixedParElt(0,0,0,rank); - TestMixedParElt(0,0,1,rank); - TestMixedParElt(0,1,0,rank); - TestMixedParElt(1,0,0,rank); - TestMixedParElt(1,1,0,rank); - TestMixedParElt(1,1,1,rank); - TestMixedParElt(0,1,1,rank); - } - if(nprocs%2==0){ - TestMixedParElt(1,1,1,rank); - TestMixedParElt(1,2,1,rank); - TestMixedParElt(2,1,1,rank); - TestMixedParElt(1,1,2,rank); - TestMixedParElt(2,2,2,rank); - } - TestDot(0,0,0,rank); - TestDot(0,0,1,rank); - TestDot(0,1,0,rank); - TestDot(0,1,1,rank); - TestDot(1,0,0,rank); - TestDot(1,0,1,rank); - TestDot(1,1,0,rank); - TestDot(1,1,1,rank); - - TestPar(0, rank); - TestPar(1, rank); - */ - double start, end; - start=MPI_Wtime(); - TestLargeDot(0,0,0,rank); - TestLargeDot(0,0,1,rank); - TestLargeDot(0,1,0,rank); - TestLargeDot(0,1,1,rank); - TestLargeDot(1,0,0,rank); - TestLargeDot(1,0,1,rank); - TestLargeDot(1,1,0,rank); - TestLargeDot(1,1,1,rank); - end=MPI_Wtime(); - if(rank==0) - LOG(ERROR)<<"dot time for 256*4k 4k*4k matrix, "<<end-start; - /* - TestSubarray(0,0,0,rank); - TestSubarray(0,0,1,rank); - TestSubarray(0,1,0,rank); - TestSubarray(0,1,1,rank); - TestReshape(0,0,0,rank); - TestReshape(0,0,1,rank); - TestReshape(0,1,0,rank); - TestReshape(0,1,1,rank); - */ - - LOG(ERROR)<<"finish"; - lapis::GAry::Finalize(); - MPI_Finalize(); - return 0; -} - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_dary.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_dary.cc b/src/test/dist_test/test_dary.cc deleted file mode 100644 index ce605e6..0000000 --- a/src/test/dist_test/test_dary.cc +++ /dev/null @@ -1,85 +0,0 @@ -#include <iostream> -#include "darray/dary.h" -#include "utils/timer.h" - - -int main() { - lapis::DAry x({1000000}); - lapis::DAry y({1000000}); - x.Random(); - y.Random(); - lapis::Timer t; - for(int i=0;i<100;i++){ - float *dptrx=x.dptr(); - float *dptry=y.dptr(); - for(int k=0;k<10000;k++) - dptrx[k]*=dptry[k]; - } - std::cout<<"arymath: "<<t.elapsed()/10<<std::endl; - lapis::DAry m({1000000}); - lapis::DAry n({1000000}); - m.Random(); - n.Random(); - t.Reset(); - for(int i=0;i<100;i++) - m.Mult(m,n); - std::cout<<"arymath: "<<t.elapsed()/10<<std::endl; - - - lapis::DAry a({2,2}); - lapis::DAry b,c; - b.InitLike(a); - c.InitLike(a); - a.Random(); - b.Random(); - std::cout<<a.ToString()<<std::endl; - std::cout<<b.ToString()<<std::endl; - c.Dot(a,b); - std::cout<<"c=a.b"<<c.ToString()<<std::endl; - a.Add(b); - std::cout<<"a=a+b"<<a.ToString()<<std::endl; - a.Mult(a,b); - std::cout<<"a=a*b"<<a.ToString()<<std::endl; - a.Minus(a,b); - std::cout<<"a=a-b"<<a.ToString()<<std::endl; - - c.Random(); - std::cout<<"random c "<<c.ToString()<<std::endl; - a.Threshold(c, 0.3); - std::cout<<"a=threshold(c,0.3) "<<a.ToString()<<std::endl; - - a.Pow(c, 0.4); - std::cout<<"a=Pow(c,0.4) "<<a.ToString()<<std::endl; - - c.Set(0.5); - std::cout<<"c=set(0.5) "<<c.ToString()<<std::endl; - a.Square(c); - std::cout<<"a=square(c) "<<a.ToString()<<std::endl; - - c.Copy(a); - std::cout<<"c=Copy(a) "<<c.ToString()<<std::endl; - - lapis::DAry d({2}); - d.SumRow(b); - std::cout<<"d=SumRow(b) "<<d.ToString()<<std::endl; - d.SumCol(b); - std::cout<<"d=SumCol(b) "<<d.ToString()<<std::endl; - b.AddRow(d); - std::cout<<"b=AddRow(d) "<<b.ToString()<<std::endl; - b.AddCol(d); - std::cout<<"b=AddCol(d) "<<b.ToString()<<std::endl; - - std::cout<<"max(b) "<<b.Max()<<std::endl; - std::cout<<"Sum(b) "<<b.Sum()<<std::endl; - - lapis::DAry e({3,3,3}); - e.SampleGaussian(0.0f,1.0f); - std::cout<<"Gaussain e "<<e.ToString()<<std::endl; - - lapis::DAry f({9}); - f.Sum(e, 0, {0,2}); - std::cout<<"f.sum "<<f.ToString()<<std::endl; - - return 0; -} - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_disk_table.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_disk_table.cc b/src/test/dist_test/test_disk_table.cc deleted file mode 100644 index 99987bb..0000000 --- a/src/test/dist_test/test_disk_table.cc +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright © 2014 Anh Dinh. All Rights Reserved. -// main class for testing distributed memory layer -// -// the command to run this should be: -// mpirun -hostfile <host> -bycore -nooversubscribe -// -n <num_servers> test -sync_update - - -#include "core/global-table.h" -#include "core/common.h" -#include "core/disk-table.h" -#include "core/table.h" -#include "core/table_server.h" -#include "utils/global_context.h" -#include <gflags/gflags.h> -#include "proto/model.pb.h" -#include "worker.h" -#include <cmath> - -DEFINE_int32(record_size,100, "# elements per float vector"); -DECLARE_int32(block_size); -DEFINE_int32(table_size, 1000, "# records per table"); -DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration file for node roles"); -DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model configuration file"); -DEFINE_bool(is_testing_put,true, "data put vs. data get"); -DECLARE_int32(debug_index); -DECLARE_int32(table_buffer); -using namespace lapis; - -typedef map<int, GlobalTable*> Map; -Map tables; - -// put random message to the pointers -void create_random_message(FloatVector* message, const int count){ - for (int i=0; i<FLAGS_record_size; i++){ - message->add_data(count*FLAGS_record_size+i); - } -} - -void create_disk_table(int id){ - DiskTableDescriptor *info = new DiskTableDescriptor(id, "disk_test", - FLAGS_block_size); - info->key_marshal = new Marshal<int>(); - info->value_marshal = new Marshal<FloatVector>(); - tables[id] = new TypedDiskTable<int,FloatVector>(info); -} - - -// if testing put, write and send data. Else do nothing -void run_coordinator(shared_ptr<NetworkThread> network, int tid){ - // wait for wokers to be up - RegisterWorkerRequest req; - for (int i=0; i<network->size()-1; i++) - network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req); - - // put data in - TypedDiskTable<int, FloatVector>* table = static_cast<TypedDiskTable<int, - FloatVector>*>(tables[tid]); - - // if testing put() - if (FLAGS_is_testing_put) { - int count = 0; - for (int i = 0; i < FLAGS_table_size; i++) { - FloatVector message; - create_random_message(&message, i); - table->put(i, message); - count += message.ByteSize(); - } - table->finish_put(); - } - - VLOG(3) << "Coordinator about to shut down"; - for (int i=0; i<network->size()-1; i++){ - EmptyMessage end_msg; - network->Read(i,MTYPE_WORKER_END, &end_msg); - } - - EmptyMessage shutdown_msg; - for (int i = 0; i < network->size() - 1; i++) { - network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg); - } - network->Flush(); - network->Shutdown(); - table->PrintStats(); - - if (FLAGS_is_testing_put) { - int sub_blocks = ceil(((double) FLAGS_table_size / FLAGS_table_buffer)); - CHECK_EQ(table->stats()["total sub block sent"], sub_blocks); - CHECK_EQ(table->stats()["total record sent"], FLAGS_table_size); - VLOG(3) << "test coordinator sending: successful"; - } - -} - -// if testing put(), do nothing. Else read() until done() -void run_worker(shared_ptr<NetworkThread> network, int tid){ - TableServer* ts = new TableServer(); - ts->StartTableServer(tables); - - // put data in - TypedDiskTable<int, FloatVector>* table = static_cast<TypedDiskTable<int, - FloatVector>*>(tables[tid]); - double total_read = 0; - if (!FLAGS_is_testing_put){ - VLOG(3) << "testing read from table ..."; - table->Load(); - while (!table->done()){ - int k; - FloatVector v; - table->get(&k,&v); - table->Next(); - total_read++; - } - - int k; - FloatVector v; - table->get(&k, &v); - total_read++; - } - - int size = network->size(); - - network->Flush(); - network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END, - EmptyMessage()); - EmptyMessage msg; - - int src = 0; - network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, &msg, - &src); - network->Flush(); - network->Shutdown(); - - Stats stats = - (static_cast<TypedDiskTable<int, FloatVector>*>(tables[0]))->stats(); - - if (FLAGS_is_testing_put) { - int sub_blocks = ceil(((double) FLAGS_table_size / FLAGS_table_buffer)); - if (size == 2) { - CHECK_EQ(stats["total sub block received"], sub_blocks); - CHECK_EQ(stats["total record stored"], FLAGS_table_size); - } - VLOG(3) << "test table-server writing: successful"; - VLOG(3) << "number of sub blocks = " << sub_blocks; - VLOG(3) << "total data stored = " << stats["total byte stored"]; - } - else{ - if (size==2) - CHECK_EQ(stats["total record read"], FLAGS_table_size); - VLOG(3) << "test table-server reading: successful"; - VLOG(3) << "read bandwidth = " - << (stats["total byte read"] - / (stats["last byte read"] - stats["first byte read"])); - //VLOG(3) << "total number of record read = " << stats["total record read"]; - } - - network->PrintStats(); - static_cast<TypedDiskTable<int, FloatVector>*>(tables[0])->PrintStats(); -} - -// check all the records have been stored to disk -int test_disk(int tid) { - // Init GlobalContext - auto gc = lapis::GlobalContext::Get(FLAGS_system_conf, FLAGS_model_conf); - //start network thread - shared_ptr<NetworkThread> network = NetworkThread::Get(); - - if (network->id() == network->size() - 1) - run_coordinator(network, tid); - else - run_worker(network,tid); - return 0; -} - -// for debugging use -//#ifndef FLAGS_v -// DEFINE_int32(v, 3, "vlog controller"); -//#endif - -int main(int argc, char **argv) { - FLAGS_logtostderr = 1; - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - create_disk_table(0); - return test_disk(0); -} - - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_mnistlayer.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_mnistlayer.cc b/src/test/dist_test/test_mnistlayer.cc deleted file mode 100644 index 882e121..0000000 --- a/src/test/dist_test/test_mnistlayer.cc +++ /dev/null @@ -1,165 +0,0 @@ -#include <gtest/gtest.h> -#include <sys/stat.h> -#include <cstdint> -#include "opencv2/highgui/highgui.hpp" -#include "opencv2/imgproc/imgproc.hpp" - -#include "model/layer.h" -#include "proto/model.pb.h" -#include "utils/shard.h" -using namespace singa; -TEST(MnistLayerTest, SingleScale){ - LayerProto proto; - MnistProto *mnist=proto.mutable_mnist_param(); - mnist->set_size(55); - MnistImageLayer layer; - layer.FromProto(proto); - cv::Mat image; - image=cv::imread("src/test/data/mnist.png", 0); - string pixel; - pixel.resize(image.rows*image.cols); - for(int i=0,k=0;i<image.rows;i++) - for(int j=0; j<image.cols;j++) - pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j)); - Record rec; - rec.set_type(Record_Type_kMnist); - MnistRecord *mrec=rec.mutable_mnist(); - mrec->set_pixel(pixel); - layer.Setup(1, rec, kNone); - layer.AddInputRecord(rec); - - const vector<uint8_t>& dat=layer.Convert2Image(0); - int s=static_cast<int>(sqrt(dat.size())); - cv::Mat newimg(s,s,CV_8UC1); - int count=0; - for(int i=0,k=0;i<newimg.rows;i++) - for(int j=0; j<newimg.cols;j++){ - count+=dat[k]>0; - newimg.at<uint8_t>(i,j)=dat[k++]; - } - //LOG(ERROR)<<"image positive "<<count<<" size "<<s; - cv::imwrite("src/test/data/mnist_scale.png", newimg); -} - -TEST(MnistLayerTest, SingleAffineTransform){ - LayerProto proto; - MnistProto *mnist=proto.mutable_mnist_param(); - mnist->set_beta(15); - mnist->set_gamma(16); - mnist->set_size(55); - MnistImageLayer layer; - layer.FromProto(proto); - cv::Mat image; - image=cv::imread("src/test/data/mnist.png", 0); - string pixel; - pixel.resize(image.rows*image.cols); - for(int i=0,k=0;i<image.rows;i++) - for(int j=0; j<image.cols;j++) - pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j)); - Record rec; - rec.set_type(Record_Type_kMnist); - MnistRecord *mrec=rec.mutable_mnist(); - mrec->set_pixel(pixel); - layer.Setup(1, rec, kNone); - layer.AddInputRecord(rec); - - const vector<uint8_t>& dat=layer.Convert2Image(0); - int s=static_cast<int>(sqrt(dat.size())); - cv::Mat newimg(s,s,CV_8UC1); - int count=0; - for(int i=0,k=0;i<newimg.rows;i++) - for(int j=0; j<newimg.cols;j++){ - count+=dat[k]>0; - newimg.at<uint8_t>(i,j)=dat[k++]; - } - //LOG(ERROR)<<"image positive "<<count<<" size "<<s; - - cv::imwrite("src/test/data/mnist_affine.png", newimg); -} -TEST(MnistLayerTest, SingleElasticDistortion){ - LayerProto proto; - MnistProto *mnist=proto.mutable_mnist_param(); - mnist->set_elastic_freq(1); - mnist->set_sigma(6); - mnist->set_alpha(36); - mnist->set_beta(15); - mnist->set_gamma(16); - mnist->set_size(55); - mnist->set_kernel(21); - MnistImageLayer layer; - layer.FromProto(proto); - cv::Mat image; - image=cv::imread("src/test/data/mnist.png", 0); - string pixel; - pixel.resize(image.rows*image.cols); - for(int i=0,k=0;i<image.rows;i++) - for(int j=0; j<image.cols;j++) - pixel[k++]=static_cast<char>(image.at<uint8_t>(i,j)); - Record rec; - rec.set_type(Record_Type_kMnist); - MnistRecord *mrec=rec.mutable_mnist(); - mrec->set_pixel(pixel); - layer.Setup(1, rec, kNone); - layer.AddInputRecord(rec); - - const vector<uint8_t>& dat=layer.Convert2Image(0); - int s=static_cast<int>(sqrt(dat.size())); - cv::Mat newimg(s,s,CV_8UC1); - int count=0; - for(int i=0,k=0;i<newimg.rows;i++) - for(int j=0; j<newimg.cols;j++){ - count+=dat[k]>0; - newimg.at<uint8_t>(i,j)=dat[k++]; - } - cv::imwrite("src/test/data/mnist_elastic.png", newimg); -} -TEST(MnistLayerTest, MultElasticDistortion){ - LayerProto proto; - MnistProto *mnist=proto.mutable_mnist_param(); - int kTotal=100; - int kSize=29; - mnist->set_elastic_freq(kTotal); - mnist->set_sigma(6); - mnist->set_alpha(36); - mnist->set_beta(15); - mnist->set_gamma(16); - mnist->set_size(kSize); - mnist->set_kernel(21); - MnistImageLayer layer; - layer.FromProto(proto); - vector<vector<int>> shapes{{kTotal, kSize,kSize}}; - layer.Setup(shapes, kNone); - shard::Shard source("/data1/wangwei/singa/data/mnist/test/",shard::Shard::kRead); - int n=static_cast<int>(sqrt(kTotal)); - cv::Mat origin(n*28,n*28, CV_8UC1); - char disp[1024]; - for(int x=0;x<n;x++){ - sprintf(disp+strlen(disp), "\n"); - for(int y=0;y<n;y++){ - Record rec; - string key; - CHECK(source.Next(&key, &rec)); - const string pixel=rec.mnist().pixel(); - cv::Mat img=origin(cv::Rect(y*28, x*28, 28, 28)); - for(int i=0,k=0;i<28;i++) - for(int j=0;j<28;j++) - img.at<uint8_t>(i,j)=static_cast<uint8_t>(pixel[k++]); - layer.AddInputRecord(rec); - sprintf(disp+strlen(disp), "%d ", rec.mnist().label()); - } - } - LOG(ERROR)<<disp; - cv::imwrite("src/test/data/mnist_big.png", origin); - - cv::Mat output(n*kSize,n*kSize, CV_8UC1); - for(int i=0;i<kTotal;i++){ - const vector<uint8_t>& dat=layer.Convert2Image(i); - int x=(i/n); - int y=i%n; - cv::Mat img=output(cv::Rect(y*kSize, x*kSize, kSize, kSize)); - for(int i=0,k=0;i<kSize;i++) - for(int j=0;j<kSize;j++) - img.at<uint8_t>(i,j)=dat[k++]; - } - cv::imwrite("src/test/data/mnist_bigout.png", output); -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_model.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_model.cc b/src/test/dist_test/test_model.cc deleted file mode 100644 index c3f98b9..0000000 --- a/src/test/dist_test/test_model.cc +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright © 2014 Wei Wang. All Rights Reserved. -// 2014-08-02 14:13 -#include <glog/logging.h> -#include <gflags/gflags.h> - - -#include "model/sgd_trainer.h" -#include "model/net.h" -#include "proto/model.pb.h" -#include "utils/proto_helper.h" - -DEFINE_int32(v, 1, "vlog"); - -int main(int argc, char** argv) { - FLAGS_logtostderr=1; - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - lapis::ModelProto model_proto; - lapis::ReadProtoFromTextFile("examples/imagenet12/model.conf", &model_proto); - lapis::SGDTrainer trainer; - trainer.Init(model_proto.trainer()); - lapis::Net net; - net.Init(model_proto.net()); - trainer.Run(&net); -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_neuralnet.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_neuralnet.cc b/src/test/dist_test/test_neuralnet.cc deleted file mode 100644 index a857124..0000000 --- a/src/test/dist_test/test_neuralnet.cc +++ /dev/null @@ -1,141 +0,0 @@ -#include <gtest/gtest.h> -#include <model/neuralnet.h> -#include "proto/model.pb.h" -#include "utils/common.h" -#include "utils/param_updater.h" - -using namespace singa; -NetProto CreateMLPProto(){ - ModelProto model; - ReadProtoFromTextFile("examples/mnist/mlp.conf", &model); - return model.neuralnet(); -} -TEST(NeuralnetTest, BP){ - ModelProto model; - ReadProtoFromTextFile("examples/mnist/mlp.conf", &model); - - AdaGradUpdater updater; - updater.Init(model.solver().updater()); - - NeuralNet net(model.neuralnet()); - auto layers=net.layers(); - for(int i=0;i<3;i++){ - bool firstlayer=true; - for(auto& layer: layers){ - layer->ComputeFeature(); - if(firstlayer){ - DataLayer* dl=static_cast<DataLayer*>(layer.get()); - dl->CompletePrefetch(); - firstlayer=false; - } - } - - for(int k=layers.size()-1;k>=0;k--){ - layers[k]->ComputeGradient(); - for(Param* param: layers[k]->GetParams()) - updater.Update(i, param); - } - } -} -NetProto CreateConvNetProto(){ - NetProto proto; - LayerProto *layer; - - layer=proto.add_layer(); - layer->set_name("data"); - layer->set_type("kShardData"); - DataProto *data=layer->mutable_data_param(); - data->set_batchsize(8); - data->set_path("/data1/wangwei/singa/data/mnist/train/"); - - // 4x3x10x10 - layer=proto.add_layer(); - layer->set_name("mnist"); - layer->set_type("kMnistImage"); - layer->add_srclayers("data"); - - // 4x1 - layer=proto.add_layer(); - layer->set_name("label"); - layer->set_type("kLabel"); - layer->add_srclayers("data"); - - // 4x8x9x9 - layer=proto.add_layer(); - layer->set_name("conv1"); - layer->set_type("kConvolution"); - layer->add_srclayers("mnist"); - layer->add_param(); - layer->add_param(); - ConvolutionProto *conv=layer->mutable_convolution_param(); - conv->set_num_filters(8); - conv->set_kernel(2); - - // 4x8x9x9 - layer=proto.add_layer(); - layer->set_name("relu1"); - layer->set_type("kReLU"); - layer->add_srclayers("conv1"); - - // 4x8x4x4 - layer=proto.add_layer(); - layer->set_name("pool1"); - layer->set_type("kPooling"); - layer->add_srclayers("relu1"); - PoolingProto *pool=layer->mutable_pooling_param(); - pool->set_kernel(4); - pool->set_stride(2); - - // 4x10 - layer=proto.add_layer(); - layer->set_name("fc1"); - layer->set_type("kInnerProduct"); - layer->add_srclayers("pool1"); - layer->add_param(); - layer->add_param(); - InnerProductProto *inner=layer->mutable_inner_product_param(); - inner->set_num_output(10); - - // 4x10 - layer=proto.add_layer(); - layer->set_name("loss"); - layer->set_type("kSoftmaxLoss"); - layer->add_srclayers("fc1"); - layer->add_srclayers("label"); - - return proto; -} - -TEST(NeuralNetTest, NoPartition){ - NetProto proto=CreateConvNetProto(); - NeuralNet net(proto); - const auto& layers=net.layers(); - ASSERT_EQ(8, layers.size()); - ASSERT_EQ("data", layers.at(0)->name()); - ASSERT_EQ("loss", layers.at(7)->name()); -} - -TEST(NeuralNetTest, DataPartition){ - NetProto proto=CreateConvNetProto(); - proto.set_partition_type(kDataPartition); - NeuralNet net(proto, 3); - const auto& layers=net.layers(); - ASSERT_EQ(28, layers.size()); - ASSERT_EQ("data", layers.at(0)->name()); -} -TEST(NeuralNetTest, LayerPartition){ - NetProto proto=CreateConvNetProto(); - proto.set_partition_type(kLayerPartition); - NeuralNet net(proto, 2); - // const auto& layers=net.layers(); -} -TEST(NeuralNetTest, HyridPartition){ - NetProto proto=CreateConvNetProto(); - int num_layers=proto.layer_size(); - proto.mutable_layer(num_layers-2)->set_partition_type(kDataPartition); - proto.mutable_layer(num_layers-1)->set_partition_type(kDataPartition); - proto.set_partition_type(kLayerPartition); - NeuralNet net(proto, 2); -} - - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_pm.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_pm.cc b/src/test/dist_test/test_pm.cc deleted file mode 100644 index 67c210a..0000000 --- a/src/test/dist_test/test_pm.cc +++ /dev/null @@ -1,88 +0,0 @@ -#include <sys/types.h> -#include <sys/stat.h> -#include <fcntl.h> - -#include <iostream> -#include <fstream> - -#include <gflags/gflags.h> -#include <glog/logging.h> -#include "utils/cluster.h" -#include "utils/common.h" -#include "proto/model.pb.h" -#include "proto/cluster.pb.h" -#include "server/server.h" -#include "server/pm_server.h" -#include "worker/pm_client.h" -#include "worker/worker.h" -#include "proto/topology.pb.h" -#include <string.h> -#include <google/protobuf/text_format.h> -#include <google/protobuf/io/zero_copy_stream_impl.h> - -using namespace google::protobuf::io; -using google::protobuf::TextFormat; - -using std::ifstream; - -/** - * Testing put/get/update performance of the new zeromq-based parameter - * servers. - */ -DEFINE_int32(procsID, 0, "global process ID"); -DEFINE_string(hostfile, "examples/imagenet12/hostfile", "hostfile"); -DEFINE_string(cluster_conf, "examples/imagenet12/cluster.conf", - "configuration file for the cluster"); -DEFINE_string(model_conf, "examples/imagenet12/model.conf", - "Deep learning model configuration file"); - -DEFINE_string(topology_config,"examples/imagenet12/topology.conf", "Network of servers"); -DEFINE_int32(server_threads,1,"Number of server's worker threads per process"); -DEFINE_int32(client_threads,1,"Number of client's worker threads per process"); - -DEFINE_string(mode, "client", "client or server mode"); -DEFINE_int32(node_id, 0, "ID of the node, client or server"); -DEFINE_int32(primary_set, 0, "ID of the primary server set (for client mode only)"); - -/** - * - * Read the topology file in, and start the Client or server respectively. - * - * test_pm --node_id <id> - */ - - -#ifndef FLAGS_v - DEFINE_int32(v, 3, "vlog controller"); -#endif - -int main(int argc, char **argv) { - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - FLAGS_logtostderr = 1; - - - //Read in the topology file - int fd = open(FLAGS_topology_config.c_str(), O_RDONLY); - assert(fd != -1); - singa::Topology topology; - TextFormat::Parse(new FileInputStream(fd), &topology); - - - //read host file - ifstream hostfile(FLAGS_hostfile.c_str()); - string host; - vector<string> hosts; - while (getline(hostfile, host)) - hosts.push_back(host); - - if (FLAGS_node_id < topology.nservers()) { - singa::SingaServer *server = new singa::SingaServer(FLAGS_node_id, topology, hosts); - server->StartServer(); - } else { - singa::SingaClient *client = new singa::SingaClient(FLAGS_node_id, topology, hosts); - client->StartClient(); - } - - return 0; -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_router.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_router.cc b/src/test/dist_test/test_router.cc deleted file mode 100644 index bed3d99..0000000 --- a/src/test/dist_test/test_router.cc +++ /dev/null @@ -1,27 +0,0 @@ -#include <gflags/gflags.h> -#include <gtest/gtest.h> -#include "utils/router.h" -#include "utils/common.h" -#include "utils/cluster.h" -DEFINE_string(hostfile, "examples/imagenet12/hostfile", "hostfile"); -DEFINE_string(cluster_conf, "examples/imagenet12/cluster.conf", - "configuration file for the cluster"); -DEFINE_int32(procsID, 0, "global process ID"); - -int main(int argc, char** argv){ - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - - // Init Cluster - singa::ClusterProto pcluster; - singa::ReadProtoFromTextFile(FLAGS_cluster_conf.c_str(), &pcluster); - auto cluster=singa::Cluster::Get(pcluster, FLAGS_hostfile, FLAGS_procsID); - if(cluster->AmIServer()){ - singa::Router server(5732); - CHECK(server.Bind(cluster->server_addr(0), cluster->nworkers())); - }else{ - singa::Router worker(5732); - CHECK(worker.Connect(cluster->server_addr(0))); - } - return 0; -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_split.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_split.cc b/src/test/dist_test/test_split.cc deleted file mode 100644 index 674d546..0000000 --- a/src/test/dist_test/test_split.cc +++ /dev/null @@ -1,304 +0,0 @@ -// Copyright © 2014 Anh Dinh. All Rights Reserved. - - -// Testing the unbalance in spliting parameter vectors. - -#include "core/global-table.h" -#include "core/common.h" -#include "core/disk-table.h" -#include "core/table.h" -#include "core/table_server.h" -#include "utils/global_context.h" -#include <gflags/gflags.h> -#include "proto/model.pb.h" -#include "worker.h" -#include "coordinator.h" -//#include "model_controller/myacc.h" -#include "utils/common.h" - -#include <cmath> -#include <stdlib.h> -#include <vector> -#include <iostream> -#include <fstream> - -using namespace lapis; -using std::vector; - -//DEFINE_bool(sync_update, false, "Synchronous put/update queue"); -DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration file for node roles"); -DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model configuration file"); -DEFINE_int64(threshold,1000000, "max # of parameters in a vector"); -DEFINE_int32(iterations,5,"numer of get/put iterations"); -DEFINE_int32(workers,2,"numer of workers doing get/put"); -#ifndef FLAGS_v - DEFINE_int32(v, 3, "vlog controller"); -#endif - -typedef map<int, GlobalTable*> Map; -Map tables; -shared_ptr<NetworkThread> network; -shared_ptr<GlobalContext> context; -std::vector<ServerState*> server_states; -TableServer *table_server; - -FloatVector large_msg, small_msg; -const int SIZE=16; - -long sizes[] = { 37448736, 16777216, 4096000, 1327104, 884736, 884736, 614400, - 14112, 4096, 4096, 1000, 384, 384, 256, 256, 96 }; - -vector<FloatVector*> value_msg; - -int num_keys; - -// create large and small messages -void init_messages(){ - num_keys = 0; - long nservers=context->num_table_servers(); - for (int i=0; i<SIZE; i++){ - int total=0; - int threshold=std::max(FLAGS_threshold,0l);//, sizes[i]/nservers); - VLOG(3)<<"worker: "<<threshold; - while (total<sizes[i]){ - FloatVector* fv = new FloatVector(); - for (int j=0; j+total<sizes[i] && j<threshold; j++) - fv->add_data(static_cast<float>(rand())/static_cast<float>(RAND_MAX)); - value_msg.push_back(fv); - total+=threshold; - num_keys++; - } - } -} - -void create_mem_table(int id, int num_shards){ - - TableDescriptor *info = new TableDescriptor(id, num_shards); - info->key_marshal = new Marshal<int>(); - info->value_marshal = new Marshal<FloatVector>(); - info->sharder = new Sharding::Mod; - info->accum = new MyAcc(); - info->partition_factory = new typename SparseTable<int, FloatVector>::Factory; - auto table=new TypedGlobalTable<int, FloatVector>(); - table->Init(info); - tables[id] = table; -} - -void coordinator_assign_tables(int id){ - for (int i = 0; i < context->num_processes()-1; ++i) { - RegisterWorkerRequest req; - int src = 0; - network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src); - // adding memory server. - if (context->IsTableServer(i)) { - server_states.push_back(new ServerState(i)); - } - } - LOG(INFO) << " All servers registered and started up. Ready to go"; - // set itself as the current worker for the table - tables[id]->worker_id_ = network->id(); - - // memory servers are specified in global context. Round-robin assignment - - VLOG(3)<<"num of shards"<<tables[id]->num_shards()<<" for table"<< id; - - int server_idx = 0; - for (int shard = 0; shard < tables[id]->num_shards(); ++shard) { - ServerState &server = *server_states[server_idx]; - LOG(INFO) << "Assigning table ("<<id<<","<<shard<<") to server " - <<server_states[server_idx]->server_id; - - // TODO(Anh) may overwrite this field if #shards>#table_servers - server.shard_id = shard; - server.local_shards.insert(new TaskId(id, shard)); - server_idx = (server_idx + 1) % server_states.size(); - } - - VLOG(3)<<"table assignment"; - // then send table assignment - ShardAssignmentRequest req; - for (size_t i = 0; i < server_states.size(); ++i) { - ServerState &server = *server_states[i]; - for (auto * task: server.local_shards) { - ShardAssignment *s = req.add_assign(); - s->set_new_worker(server.server_id); - s->set_table(task->table); - s->set_shard(task->shard); - // update local tables - CHECK(tables.find(task->table)!=tables.end()); - GlobalTable *t = tables.at(task->table); - t->get_partition_info(task->shard)->owner = server.server_id; - delete task; - } - } - VLOG(3)<<"finish table assignment, req size "<<req.assign_size(); - network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, MTYPE_SHARD_ASSIGNMENT_DONE, req); - VLOG(3)<<"finish table server init"; -} - -void worker_table_init(){ - table_server = new TableServer(); - table_server->StartTableServer(tables); - VLOG(3) << "done starting table server"; -} - -double random_double(){ - return static_cast<double>(rand())/static_cast<double>(RAND_MAX); -} - -// popular table with random large or small messages. -// the message distribution specified in FLAGS_large_precentage -void coordinator_load_data(){ - auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]); - - num_keys = 0; - int nservers=context->num_table_servers(); - for (int i = 0; i < SIZE; i++) { - int total = 0; - int threshold=std::max(FLAGS_threshold,0l);// sizes[i]/nservers); - while (total < sizes[i]) { - FloatVector* fv = new FloatVector(); - for (int j = 0; j + total < sizes[i] && j < threshold; j++) - fv->add_data( - static_cast<float>(rand()) - / static_cast<float>(RAND_MAX)); - table->put(num_keys,*fv); - total += threshold; - num_keys++; - } - } - VLOG(3) << "Loaded data successfully ... " << num_keys << " messages"; -} - -void get(TypedGlobalTable<int,FloatVector>* table, ofstream &latency){ - double start , end; - StateQueue<int> state(num_keys); - FloatVector v; - /* - for (int i=0; i<num_keys; i++){ - start = Now(); - table->get(i); - end=Now(); - latency << "get: " << (end - start) << endl; - } - */ - start=Now(); - for (int i=0; i<num_keys; i++){ - if(table->async_get(i, &v)) - state.Invalid(i); - } - latency << "send get: " << (Now() - start) << endl; - start=Now(); - while(state.HasValid()){ - int key=state.Next(); - if(table->async_get_collect(&key, &v)) - state.Invalid(key); - sleep(0.001); - } - latency << "collect get: " << (Now() - start) << endl; -} - -void update(TypedGlobalTable<int,FloatVector>* table, ofstream &latency){ - double start, end; - for (int i=0; i<num_keys; i++){ - start = Now(); - table->update(i,*value_msg[i]); - end=Now(); - latency << "update: " << (end - start) << endl; - } -} - -void worker_test_data(){ - init_messages(); - auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]); - - ofstream latency(StringPrintf("latency_%d",NetworkThread::Get()->id())); - ofstream throughput(StringPrintf("throughput_%d", NetworkThread::Get()->id())); - double start, end; - for (int i=0; i<FLAGS_iterations; i++){ - start = Now(); - get(table, latency); - end=Now(); - throughput << "get: " << (end - start) << " over " << num_keys << " ops " << endl; - start = Now(); - update(table, latency); - end=Now(); - throughput << "update: " << (end - start) << " over " << num_keys << " ops " << endl; - sleep(10); - } - latency.close(); - throughput.close(); - -} - -void print_table_stats(){ - auto table = static_cast<TypedGlobalTable<int,FloatVector>*>(tables[0]); - ofstream log_file(StringPrintf("log_variance_%d", NetworkThread::Get()->id())); - log_file << "table size at process "<< NetworkThread::Get()->id()<<" = " << table->stats()["TABLE_SIZE"] << endl; - log_file.close(); -} - -void shutdown(){ - if (context->AmICoordinator()){ - VLOG(3) << "Coordinator is shutting down ..."; - EmptyMessage msg; - for (int i=0; i<context->num_processes()-1; i++) - network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg); - EmptyMessage shutdown_msg; - for (int i = 0; i < network->size() - 1; i++) { - network->Send(i, MTYPE_WORKER_SHUTDOWN, shutdown_msg); - } - network->Flush(); - network->Shutdown(); - } - else{ - VLOG(3) << "Worker " << network->id() << " is shutting down ..."; - network->Flush(); - VLOG(3) << "Done flushing the network thread"; - network->Send(GlobalContext::kCoordinatorRank, MTYPE_WORKER_END, EmptyMessage()); - EmptyMessage msg; - network->Read(GlobalContext::kCoordinatorRank, MTYPE_WORKER_SHUTDOWN, &msg); - VLOG(3) << "Worker received MTYPE_WORKER_SHUTDOWN"; - - table_server->ShutdownTableServer(); - VLOG(3) << "Flushing node " << network->id(); - network->Shutdown(); - } -} - - -int main(int argc, char **argv) { - FLAGS_logtostderr = 1; - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - - context = GlobalContext::Get(FLAGS_system_conf, FLAGS_model_conf); - network = NetworkThread::Get(); - VLOG(3) << "*** testing memory servers, with " - << context->num_table_servers() << " servers"; - - - create_mem_table(0,context->num_table_servers()); - - LOG(INFO)<<"threshold: "<<FLAGS_threshold<<" nworkers: "<<FLAGS_workers; - if (context->AmICoordinator()){ - coordinator_assign_tables(0); - coordinator_load_data(); - network->barrier(); - } - else{ - worker_table_init(); - network->barrier(); - VLOG(3) << "passed the barrier"; - print_table_stats(); - - //Sleep(1); - if(network->id()<FLAGS_workers) - worker_test_data(); - } - - shutdown(); - return 0; -} - - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/6b1e65ee/src/test/dist_test/test_table_server.cc ---------------------------------------------------------------------- diff --git a/src/test/dist_test/test_table_server.cc b/src/test/dist_test/test_table_server.cc deleted file mode 100644 index 5f3612c..0000000 --- a/src/test/dist_test/test_table_server.cc +++ /dev/null @@ -1,357 +0,0 @@ -// Copyright © 2014 Anh Dinh. All Rights Reserved. - -#include "core/global-table.h" -#include "core/common.h" -#include "core/table.h" -#include "core/table_server.h" -#include "utils/global_context.h" -#include "utils/common.h" -#include <gflags/gflags.h> -#include "proto/model.pb.h" -#include "proto/common.pb.h" -#include "worker.h" -#include "coordinator.h" -#include "utils/common.h" -#include "utils/proto_helper.h" - -#include <cmath> -#include <stdlib.h> -#include <vector> -#include <iostream> -#include <fstream> - - -/** - * Test for table server access. The table is of type <VKey,int> - */ -DEFINE_bool(restore_mode, false, "restore from checkpoint file"); -using namespace lapis; -using std::vector; - -DEFINE_int32(checkpoint_frequency, 5000, "frequency for cp"); -DEFINE_int32(checkpoint_after, 1, "cp after this steps"); -DEFINE_string(par_mode, "hybrid", "time training algorithm"); -DEFINE_bool(restore, false, "restore from checkpoint file"); - -DEFINE_string(db_backend, "lmdb", "backend db"); -DEFINE_string(system_conf, "examples/imagenet12/system.conf", "configuration file for node roles"); -DEFINE_string(model_conf, "examples/imagenet12/model.conf", "DL model configuration file"); -DEFINE_string(checkpoint_dir,"/data1/wangwei/lapis/","check point dir"); -DEFINE_int32(threshold,1000000, "max # of parameters in a vector"); -DEFINE_int32(iterations,5,"numer of get/put iterations"); -DEFINE_int32(workers,2,"numer of workers doing get/put"); -DECLARE_bool(checkpoint_enabled); - - -DECLARE_bool(checkpoint_enabled); - -/** - * Get and update handler for VKey. - */ -struct AnhUpdateHandler: BaseUpdateHandler<VKey, SGDValue> { - bool Update(SGDValue *a, const SGDValue &b) { - - float * adptr = a->mutable_data()->mutable_value()->mutable_data(); - const float*bdptr = b.grad(0).value().data(); - for (int i = 0; i < b.grad(0).value_size(); i++) - adptr[i] += bdptr[i]; - - return true; - } - - bool Get(const VKey k, const SGDValue &val, SGDValue *ret) { - *ret = val; - return true; - } - - bool is_checkpointable(const VKey k, const SGDValue v) { - return false; //always checkpoint - } -}; - -typedef map<int, GlobalTable*> Map; -Map tables; -shared_ptr<NetworkThread> network; -shared_ptr<GlobalContext> context; -std::vector<ServerState*> server_states; -TableServer *table_server; - -#define SIZE 16 -int tuple_sizes[SIZE] = {27448736, 16777216, 4096000, 1327104, 884736, 884736, 614400,14112,4096,4096,1000,384,384,256,256,96}; - -/** - * Initialize tables. - */ -void create_mem_table(int id, int num_shards){ - - TableDescriptor *info = new TableDescriptor(id, num_shards); - info->key_marshal = new Marshal<VKey>(); - info->value_marshal = new Marshal<SGDValue>(); - info->sharder = new VKeySharder; - info->accum = new AnhUpdateHandler; - info->partition_factory = new typename SparseTable<VKey, SGDValue>::Factory; - auto table=new TypedGlobalTable<VKey, SGDValue>(); - table->Init(info); - tables[id] = table; -} - -/** - * Coordinator assigns shards to processes. - * @param id table ID. - */ -void coordinator_assign_tables(int id) { - - // wait for the servers to be up. - for (int i = 0; i < context->num_procs(); i++) { - RegisterWorkerRequest req; - int src = 0; - // adding memory server. - if (context->IsTableServer(i)) { - VLOG(3)<< "Waiting for message from table server " << i; - network->Read(MPI::ANY_SOURCE, MTYPE_REGISTER_WORKER, &req, &src); - server_states.push_back(new ServerState(i)); - } - } - - VLOG(3) << " All servers registered and started up. Ready to go"; - VLOG(3) << "num of shards" << tables[id]->num_shards() << " for table " << id; - - // assign table to shard in round roubin fashion. - int server_idx = 0; - for (int shard = 0; shard < tables[id]->num_shards(); ++shard) { - ServerState &server = *server_states[server_idx]; - VLOG(3) << "Assigning table (" << id << "," << shard << ") to server " - << server_states[server_idx]->server_id; - server.shard_id = shard; - server.local_shards.insert(new TaskId(id, shard)); - server_idx = (server_idx + 1) % server_states.size(); - } - ShardAssignmentRequest req; - for (size_t i = 0; i < server_states.size(); ++i) { - ServerState &server = *server_states[i]; - for (auto * task : server.local_shards) { - ShardAssignment *s = req.add_assign(); - s->set_new_worker(server.server_id); - s->set_table(task->table); - s->set_shard(task->shard); - // update local tables - GlobalTable *t = tables.at(task->table); - t->get_partition_info(task->shard)->owner = server.server_id; - delete task; - } - } - - network->SyncBroadcast(MTYPE_SHARD_ASSIGNMENT, MTYPE_SHARD_ASSIGNMENT_DONE, - req); - VLOG(3) << "done table assignment... "; -} - - -void table_init(){ - table_server = new TableServer(); - table_server->StartTableServer(tables); - VLOG(3) << "table server started on process "<< NetworkThread::Get()->id(); -} - - -/** - * Coordinator loads data to the table. - * @param size number of tuples. - */ -void coordinator_load_data() { - auto table = static_cast<TypedGlobalTable<VKey, SGDValue>*>(tables[0]); - for (int i = 0; i < SIZE; i++) { - VKey key; - SGDValue x; - DAryProto *data = x.mutable_data(); - DAryProto *grad = x.add_grad(); - for (int j = 0; j < tuple_sizes[i]; j++) { - data->add_value(j * 1.0f); - grad->add_value(j * 1.0f); - } - key.set_key(i); - table->put(key, x); - } - VLOG(3) << "Done loading " << SIZE << " tuples ..."; -} - -/** - * Worker gets tuples from the server. - * @param size number of tuples to be requested. - */ -void get() { - auto table = static_cast<TypedGlobalTable<VKey,SGDValue>*>(tables[0]); - SGDValue value; - for (int i = 0; i < SIZE; i++) { - VKey key; - key.set_key(i); - table->async_get(key, &value); - } - VLOG(3) << "Done sending get requests ..."; - - for (int i = 0; i < SIZE; i++) { - VKey key; - while (!table->async_get_collect(&key, &value)) - Sleep(0.0001); - } -} - -/** - * Worker updates tuples. - */ -void update() { - auto table = static_cast<TypedGlobalTable<VKey, SGDValue>*>(tables[0]); - for (int i = 0; i < SIZE; i++) { - VKey key; - key.set_key(i); - - SGDValue x; - DAryProto *grad = x.add_grad(); - for (int j = 0; j < tuple_sizes[i]; j++) - grad->add_value(j * 1.0f); - - table->update(key, x); - } - VLOG(3) << "Done updating " << SIZE << " tuples ..."; -} - - -void worker_test_data() { - //get(size); - update(); - update(); - get(); - /* - update(table, tuples); - update(table, tuples); - update(table, tuples); - get(table, tuples); - */ -} - -/** - * Shutdown the process. - */ -void shutdown() { - if (context->AmICoordinator()) { - EmptyMessage msg; - for (int i = 0; i < context->num_procs() - 1; i++) - network->Read(MPI::ANY_SOURCE, MTYPE_WORKER_END, &msg); - EmptyMessage shutdown_msg; - for (int i = 0; i < network->size() - 1; i++) { - network->Send(i, MTYPE_SHUTDOWN, shutdown_msg); - } - //network->Flush(); - network->Shutdown(); - } else { - //network->Flush(); - network->Send(context->num_procs() - 1, MTYPE_WORKER_END, - EmptyMessage()); - EmptyMessage msg; - network->Read(context->num_procs() - 1, MTYPE_SHUTDOWN, &msg); - - if (context->AmITableServer()){ - RequestDispatcher::Get()->PrintStats(); - table_server->ShutdownTableServer(); - } - - network->Shutdown(); - } -} - -/** - * Worker handle shard assignment from the coordinator. - */ -void HandleShardAssignment() { - - ShardAssignmentRequest shard_req; - auto mpi = NetworkThread::Get(); - mpi->Read(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT, &shard_req); - - // request read from coordinator - for (int i = 0; i < shard_req.assign_size(); i++) { - const ShardAssignment &a = shard_req.assign(i); - GlobalTable *t = tables.at(a.table()); - t->get_partition_info(a.shard())->owner = a.new_worker(); - - //if local shard, create check-point files - if (FLAGS_checkpoint_enabled && t->is_local_shard(a.shard())) { - string checkpoint_file = StringPrintf("%s/checkpoint_%d", - FLAGS_checkpoint_dir.c_str(), a.shard()); - char hostname[256]; - gethostname(hostname, sizeof(hostname)); - - FILE *tmp_file = fopen(checkpoint_file.c_str(), "r"); - if (tmp_file) { //exists -> open to reading and writing - fclose(tmp_file); - auto cp = t->checkpoint_files(); - - if (FLAGS_restore_mode) { //open in read mode to restore, then close - LogFile *file = new LogFile(checkpoint_file, "rw", 0); - int table_size = file->read_latest_table_size(); - delete file; - - double start = Now(); - (*cp)[a.shard()] = new LogFile(checkpoint_file, "r", - a.shard()); - t->Restore(a.shard()); - delete (*cp)[a.shard()]; - double end = Now(); - LOG(ERROR) << "restore time\t" << end - start << "\tfor\t" - << table_size << "\tthreshold\t" << FLAGS_threshold; - } - char hostname[256]; - gethostname(hostname, sizeof(hostname)); - (*cp)[a.shard()] = new LogFile(checkpoint_file, "a", a.shard()); - } else { // not exist -> open to writing first time - auto cp = t->checkpoint_files(); - (*cp)[a.shard()] = new LogFile(checkpoint_file, "w", a.shard()); - } - } - } - - EmptyMessage empty; - mpi->Send(GlobalContext::kCoordinator, MTYPE_SHARD_ASSIGNMENT_DONE, empty); - VLOG(3) << "Done handling shard assignment ..."; - -} - - -int main(int argc, char **argv) { - FLAGS_logtostderr = 1; - int provided; - MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - - context = GlobalContext::Get(FLAGS_system_conf); - network = NetworkThread::Get(); - - ModelProto model; - ReadProtoFromTextFile(FLAGS_model_conf.c_str(), &model); - - create_mem_table(0, context->num_table_servers()); - - if (context->AmICoordinator()) { - coordinator_assign_tables(0); - coordinator_load_data(); - network->barrier(); - } else { - if (context->AmITableServer()) { - table_init(); - HandleShardAssignment(); - network->barrier(); - } else { - HandleShardAssignment(); - network->barrier(); - Sleep(1); - VLOG(3) << "Worker cleared the barrier ..."; - worker_test_data(); - } - } - - shutdown(); - return 0; -} - -
