http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/neuralnet/output_layer/char_rnn.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/output_layer/char_rnn.cc b/src/neuralnet/output_layer/char_rnn.cc deleted file mode 100644 index c3f1733..0000000 --- a/src/neuralnet/output_layer/char_rnn.cc +++ /dev/null @@ -1,51 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include <algorithm> -#include <iostream> -#include <fstream> -#include "singa/neuralnet/output_layer.h" - -namespace singa { - -void CharRNNOutputLayer::Setup(const LayerProto& proto, - const vector<Layer*>& srclayers) { - CHECK_EQ(srclayers.size(), 1); - OutputLayer::Setup(proto, srclayers); - std::ifstream fin; - const string path = proto.char_rnn_conf().vocab_path(); - fin.open(path); - CHECK(fin.is_open()) << "Can't open vocab_path = " << path; - std::stringstream stream; - stream << fin.rdbuf(); - vocab_ = stream.str(); - fin.close(); -} - -void CharRNNOutputLayer::ComputeFeature(int flag, - const vector<Layer*>& srclayers) { - const float* dptr = srclayers[0]->data(this).cpu_data(); - for (int i = 0; i < srclayers[0]->data(this).shape(0); i++) { - std::cout<<vocab_[static_cast<int>(dptr[i])]; - } -} - -} // namespace singa;
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/neuralnet/output_layer/csv.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/output_layer/csv.cc b/src/neuralnet/output_layer/csv.cc deleted file mode 100644 index d2512da..0000000 --- a/src/neuralnet/output_layer/csv.cc +++ /dev/null @@ -1,59 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ -#include "singa/neuralnet/output_layer.h" - -namespace singa { - -void CSVOutputLayer::Setup(const LayerProto& conf, - const vector<Layer*>& srclayers) { - OutputLayer::Setup(conf, srclayers); - CHECK_EQ(srclayers.size(), 1); -} - -void CSVOutputLayer::ComputeFeature(int flag, const vector<Layer*>& srclayers) { - if (store_ == nullptr) { - string backend = "textfile"; - const auto& conf = layer_conf_.store_conf(); - if (conf.has_backend()) - backend = conf.has_backend(); - store_ = io::OpenStore(backend, conf.path(), io::kCreate); - } - const auto& data = srclayers.at(0)->data(this); - const auto& label = srclayers.at(0)->aux_data(); - int batchsize = data.shape()[0]; - CHECK_GT(batchsize, 0); - int dim = data.count() / batchsize; - if (label.size()) - CHECK_EQ(label.size(), batchsize); - CHECK_GT(dim, 0); - for (int k = 0; k < batchsize; k++) { - std::ostringstream record; - if (label.size()) - record << std::to_string(label[k]) << ","; - auto* dptr = data.cpu_data() + k * dim; - for (int i = 0; i < dim - 1; i++) - record << std::to_string(dptr[i]) << ","; - record << std::to_string(dptr[dim - 1]); - store_->Write(std::to_string(inst_++), record.str()); - } - store_->Flush(); -} -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/neuralnet/output_layer/record.cc ---------------------------------------------------------------------- diff --git a/src/neuralnet/output_layer/record.cc b/src/neuralnet/output_layer/record.cc deleted file mode 100644 index f7b3e01..0000000 --- a/src/neuralnet/output_layer/record.cc +++ /dev/null @@ -1,56 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ -#include "singa/neuralnet/output_layer.h" -#include "singa/proto/common.pb.h" -namespace singa { - -void RecordOutputLayer::Setup(const LayerProto& conf, - const vector<Layer*>& srclayers) { - OutputLayer::Setup(conf, srclayers); - CHECK_EQ(srclayers.size(), 1); -} - -void RecordOutputLayer::ComputeFeature(int flag, - const vector<Layer*>& srclayers) { - if (store_ == nullptr) - store_ = io::OpenStore(layer_conf_.store_conf().backend(), - layer_conf_.store_conf().path(), io::kCreate); - const auto& data = srclayers.at(0)->data(this); - const auto& label = srclayers.at(0)->aux_data(); - int batchsize = data.shape()[0]; - CHECK_GT(batchsize, 0); - int dim = data.count() / batchsize; - if (label.size()) - CHECK_EQ(label.size(), batchsize); - for (int k = 0; k < batchsize; k++) { - SingleLabelImageRecord image; - if (label.size()) - image.set_label(label[k]); - auto* dptr = data.cpu_data() + k * dim; - for (int i = 0; i < dim; i++) - image.add_data(dptr[i]); - std::string val; - image.SerializeToString(&val); - store_->Write(std::to_string(inst_++), val); - } - store_->Flush(); -} -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/proto/common.proto ---------------------------------------------------------------------- diff --git a/src/proto/common.proto b/src/proto/common.proto deleted file mode 100644 index b1ba1b6..0000000 --- a/src/proto/common.proto +++ /dev/null @@ -1,114 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -package singa; - -enum MsgType { - kGet = 0; - kPut = 1; - kSync = 2; - kUpdate = 3; - kSyncRequest = 4; - kSyncResponse = 5; - kStop = 6; - kData = 7; - kRGet = 8; - kRUpdate = 9; - kConnect = 10; - kMetric = 11; -}; - -enum EntityType { - kWorkerParam = 0; - kWorkerLayer = 1; - kServer = 2; - kStub = 3; - kRuntime = 4; -}; - -enum ConnectionType { - kOneToOne = 0; - kOneToAll = 1; - kOneToMany = 2; -} - -// to import caffe's lmdb dataset -message CaffeDatum { - optional int32 channels = 1; - optional int32 height = 2; - optional int32 width = 3; - // the actual image data, in bytes - optional bytes data = 4; - optional int32 label = 5; - // Optionally, the datum could also hold float data. - repeated float float_data = 6; - // If true data contains an encoded image that need to be decoded - optional bool encoded = 7 [default = false]; -} - -// to import caffe's blob, e.g., image mean -message CaffeBlob { - optional int32 num = 1 [default = 0]; - optional int32 channels = 2 [default = 0]; - optional int32 height = 3 [default = 0]; - optional int32 width = 4 [default = 0]; - repeated float data = 5 [packed = true]; - repeated float diff = 6 [packed = true]; -} - -message BlobProto { - repeated int32 shape = 1; - repeated float data = 2 [packed = true]; -} - -message BlobProtos { - repeated int32 id = 2; - repeated int32 version = 3; - repeated string name = 4; - repeated BlobProto blob = 5; -} - -message Record { - enum Type { - // each record contains image raw feature and its label. - kSingleLabelImage = 0; - } - optional Type type = 1 [default = kSingleLabelImage]; - optional string user_type =2; - // configuration for - optional RecordProto image = 5; - - extensions 101 to 200; -} - -// rename SingleLabelImageRecord to RecordProto -message RecordProto { - repeated int32 shape = 1; - optional int32 label = 2; - optional bytes pixel = 3; - repeated float data = 4 [packed = true]; -} - -message MetricProto { - repeated string name = 1; - repeated int32 count = 2; - repeated float val = 3; -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/proto/core.proto ---------------------------------------------------------------------- diff --git a/src/proto/core.proto b/src/proto/core.proto new file mode 100644 index 0000000..5b5fea0 --- /dev/null +++ b/src/proto/core.proto @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package singa; + +syntax = "proto2"; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/proto/job.proto ---------------------------------------------------------------------- diff --git a/src/proto/job.proto b/src/proto/job.proto deleted file mode 100644 index b4aa971..0000000 --- a/src/proto/job.proto +++ /dev/null @@ -1,816 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -package singa; - -/* - * To start a training job, all we need is a JobProto object. - * It should contain following fields - * - Job Name (name) - * the name to identify the job - * - NeuralNet (neuralnet) - * the neural network structure contains a set of layers - * - Train One Batch (alg) - * the training algorithm - * - Updater (updater) - * the protocol for updating parameters at server side - * - Cluster Topology (cluster) - * the distributed topology of workers/servers - * - Training Steps (train_steps) - * the number of training iteration - * All other fields/functions are optional, e.g., test, checkpoint - */ - -message JobProto { - // job name, e.g., "cifar10-dcnn", "mnist-mlp" - optional string name = 1; - // neural net consits of a set of connected layers - optional NetProto neuralnet = 3; - // algorithm for computing gradients over one mini-batch - optional AlgProto train_one_batch = 5; - // configuration of SGD updater, including learning rate, etc. - optional UpdaterProto updater = 7; - // cluster toplogy conf - optional ClusterProto cluster = 9; - // total num of steps for training - optional int32 train_steps = 16; - // frequency of displaying training info - optional int32 disp_freq = 17 [default = 0]; - // GPU device IDs for use, if fewer than workers per procs, some workers run - // on GPU and the rest run on CPU. - repeated int32 gpu = 18; - - // frequency of test, e.g., do test every 100 training steps - optional int32 test_freq = 20 [default = 0]; - // total num of steps for testing all test data; - // TODO(wangwei): set -1 for test forever - optional int32 test_steps = 21 [default = 0]; - // frequency of validation, e.g., do validation every 100 training steps - optional int32 validate_freq = 25 [default = 0]; - // total num of steps for validating all validation data - optional int32 validate_steps = 26 [default = 0]; - // frequency of checkpoint - optional int32 checkpoint_freq = 30 [default = 0]; - - // for loading checkpoint files to init parameters - repeated string checkpoint_path = 60; - // send parameters to servers after training for this num of steps - optional int32 warmup_steps = 61 [default = 0]; - // display debug info - optional bool debug = 62 [default = false]; - // reset the version of params loaded from checkpoint file to step - optional bool reset_param_version = 63 [default = true]; - // set num of threads used by openblas - optional int32 num_openblas_threads = 64 [default = 1]; - - // start checkpoint after this num steps - optional int32 checkpoint_after = 80 [default = 0]; - // start display after this num steps - optional int32 disp_after = 81[default = 0]; - // start test after this num steps - optional int32 test_after = 82 [default = 0]; - // start validation after this num steps - optional int32 validate_after = 83 [default = 0]; - - // for internal use - // users typically do not touch following fields - - // resume flag - optional bool resume = 90 [default = false]; - // last snapshot step - optional int32 step = 91 [default = 0]; - // job id allocated by zookeeper - optional int32 id = 92 [default = -1]; - - extensions 101 to 200; -} - -// Protos used by JobProto -// ----------------------- - -message AlgProto { - // algorithms calculating gradients for one mini-batch/iteration - optional AlgType alg = 1 [default = kUserAlg]; - // user defined algorithm - optional string user_alg = 2; - // for setting CD fields - optional CDProto cd_conf = 10; - - extensions 101 to 200; -} -message NetProto { - repeated LayerProto layer = 1; - // partitioning type for parallelism - optional int32 partition_dim = 20 [default = 0]; - // Each layer corresponds to a group of unrolled layers, used in RNN models - repeated LayerGroupProto layer_group = 21; - optional int32 unroll_len = 22 [default = 1]; -} - -message LayerGroupProto { - // name of the layers belong to the same group - repeated string layer = 1; -} - -message UpdaterProto { - // built-in updater type - optional UpdaterType type = 1 [default = kUserUpdater]; - // user-defned updater type - optional string user_type = 2; - - // configuration for RMSProp algorithm - optional RMSPropProto rmsprop_conf = 3; - // congiguration for AdaDelta algorithm - optional AdaDeltaProto adadelta_conf = 4; - // congiguration for Adam algorithm - optional AdamProto adam_conf = 5; - // congiguration for AdamMax algorithm - optional AdamMaxProto adammax_conf = 6; - - // learning rate generator - optional LRGenProto learning_rate = 11; - optional float momentum = 31 [default = 0]; - optional float weight_decay = 32 [default = 0]; - - // used to avoid divide by 0, i.e. x/(y+delta) - optional float delta = 35 [default = 0.00000001]; - - optional float clip_low = 36 [default = 0]; - optional float clip_high = 37 [default = 0]; - - extensions 101 to 200; -} - -message ClusterProto { - optional int32 nworker_groups = 1 [default = 1]; - optional int32 nserver_groups = 2 [default = 1]; - optional int32 nworkers_per_group = 3 [default = 1]; - optional int32 nservers_per_group = 4 [default = 1]; - optional int32 nworkers_per_procs = 5 [default = 1]; - optional int32 nservers_per_procs = 6 [default = 1]; - // local workspace for checkpoint files and vis files - //required string workspace = 10; - optional string workspace = 10; - - // servers and workers in different processes? - optional bool server_worker_separate = 20 [default = false]; - - // sync frequency between server groups - optional int32 sync_freq = 21 [default = 1]; - - // port number used by ZeroMQ - optional int32 start_port = 60 [default = 6723]; - // share memory space between worker groups in one procs - optional bool share_memory = 62 [default = true]; - - // poll time in milliseconds - optional int32 poll_time = 81 [default = 100]; -} - -message CDProto { - //number of steps for gibbs sampling - optional int32 cd_k = 1 [default = 1]; -} - -message LayerProto { - // the layer name used for identification - required string name = 1; - // source layer names - repeated string srclayers = 3; - // parameters, e.g., weight matrix or bias vector - repeated ParamProto param = 12; - // all layers are included in the net structure for training phase by default. - // some layers like data layer for loading test data are not used by training - // phase should be removed by setting the exclude field. - repeated Phase exclude = 15; - // exclude field is deprecated, please use include field instead!!! - // some layers like data layer for loading test data are not used by training - // in this case, only test phase should be included by setting the include field. - repeated Phase include = 14; - // type of built-in layer - optional LayerType type = 20 [default = kUserLayer]; - // type of user layer - optional string user_type = 21; - // share data and grad blob with the single src layer, e.g., relu layer can - // share blobs from conv layer. It is useful for saving memory space. - optional bool share_src_blobs = 22 [default = false]; - // for unrolling layers in RNN model - optional int32 unroll_len = 23 [default = 1]; - optional int32 unroll_index = 24 [default = 0]; - repeated UnrollConnType unroll_conn_type = 25; - repeated int32 shift = 26; - - // overrides the partition dimension for neural net - optional int32 partition_dim = 60 [default = -1]; - // names of parameters shared from other layers - optional int32 partition_id = 90 [default = 0]; - // num of partitions for this layer - optional int32 num_partitions = 91 [default = 1]; - - // layer specific configuration - // configuration for input layers, id range [100, 200) - optional StoreProto store_conf = 100; - optional DataProto lmdbdata_conf = 190; - optional MnistProto mnist_conf = 192; - optional RGBImageProto rgbimage_conf = 193; - optional DataProto sharddata_conf = 194; - optional CharRNNProto char_rnn_conf = 195; - optional OnehotProto onehot_conf = 196; - - // configuration for neuron layers id range [200, 300) - optional ActivationProto activation_conf = 200; - optional ConvolutionProto convolution_conf = 201; - optional DropoutProto dropout_conf = 203; - optional DummyProto dummy_conf = 204; - optional InnerProductProto innerproduct_conf = 205; - optional LRNProto lrn_conf = 206; - optional PoolingProto pooling_conf = 207; - optional RBMProto rbm_conf = 209; - optional ReLUProto relu_conf = 211; - optional SoftmaxProto softmax_conf = 214; - optional GRUProto gru_conf = 215; - optional EmbeddingProto embedding_conf = 216; - optional BMProto bm_conf = 217; - - // configuration for loss layers, id range [300, 400) - optional SoftmaxLossProto softmaxloss_conf = 301; - - // configuration for output layers id range [400, 500) - optional ArgSortProto argsort_conf = 401; - - // configuration for connection layers, id range [501, ) - optional ConcateProto concate_conf = 502; - optional SliceProto slice_conf = 503; - optional SplitProto split_conf = 504; - optional RNNDummyProto rnn_dummy_conf = 505; - - extensions 1001 to 1100; -} - -// weight matrix should be defined before bias vector -// TODO(wangwei): separate conf for diff init method -message ParamProto { - // used for identifying the same params from diff models and display deug info - optional string name = 1 [default = ""]; - // for built-in Param - optional ParamType type = 3 [default = kParam]; - // for user-defined Param - optional string user_type = 4; - - optional ParamGenProto init =5; - // multiplied on the global learning rate. - optional float lr_scale = 15 [default = 1]; - // multiplied on the global weight decay. - optional float wd_scale = 16 [default = 1]; - - // name of the owner param from which this param shares the values - optional string share_from = 60; - - // used interally - optional int32 id = 90; - // used internally - optional int32 owner = 91 [default = -1]; - // partition dimension, -1 for no partition - optional int32 partition_dim = 92; - // usually, the program will infer the param shape - repeated int32 shape = 93; - - extensions 101 to 200; -} - -// --------------------------- -// protos for different layers -// --------------------------- -// learning rate generator proto -message LRGenProto { - // user-defined change method - optional ChangeMethod type = 1 [default = kUserChange]; - optional string user_type = 2; - - optional float base_lr = 3 [default = 0.01]; - - optional FixedStepProto fixedstep_conf = 40; - optional StepProto step_conf = 41; - optional LinearProto linear_conf = 42; - optional ExponentialProto exponential_conf = 43; - optional InverseProto inverse_conf = 44; - optional InverseTProto inverset_conf = 45; - - extensions 101 to 200; -} - -message ParamGenProto { - optional InitMethod type = 1 [default = kUserInit]; - optional string user_type =2; - // constant init - optional float value = 3 [default = 1]; - // for gaussian sampling - optional float mean = 4 [default = 0]; - optional float std = 5 [default = 1]; - // for uniform sampling - optional float low = 8 [default = -1]; - optional float high = 9 [default = 1]; - - extensions 101 to 200; -} - -enum ActivationType { - RELU = 1; - SIGMOID = 2; - TANH = 3; - STANH = 4; -} - -message ActivationProto { - optional ActivationType type = 1 [default = RELU]; -} - -message OnehotProto { - optional int32 vocab_size = 1 [default = 0]; -} - -message RGBImageProto { - // scale factor for each pixel - optional float scale = 1 [default = 1.0]; - // size after cropping - optional int32 cropsize = 2 [default = 0]; - // mirror the image - optional bool mirror = 3 [default = false]; - // meanfile path - optional string meanfile = 4 [default = ""]; -} - -message SplitProto { - optional int32 num_splits = 1 [default = 1]; -} - -message StoreProto { - optional string backend = 1; - optional string path = 2; - optional string separator = 3 [default = ","]; - optional string mean_file = 4; - optional string std_file = 5; - optional float mean_value = 6; - optional float std_value = 7; - repeated int32 batchsize = 8; - repeated int32 shape = 9; - optional bool encoded = 10 [default = false]; - optional int32 random_skip = 11 [default = 0]; - optional bool has_label = 12 [default = true]; - optional bool prefetching = 13 [default = false]; -} - -message CharRNNProto { - optional string path = 1; - optional string vocab_path = 2; - // num of chars to read per instance, should = NetProto::unroll_len - optional int32 unroll_len = 3 [default = 50]; - optional int32 batchsize = 4 [default = 1]; -} - -message EmbeddingProto { - optional int32 vocab_size = 1 [default = 0]; - optional int32 feature_dim = 2 [default = 100]; - -} - -message BMProto { -} - -message SoftmaxLossProto { - // computing accuracy against topk results - optional int32 topk = 1 [default = 1]; - // loss scale factor - optional float scale = 30 [default = 1]; -} - -message ArgSortProto { - // keep labels with topk scores - optional int32 topk = 1 [default = 1]; -} - -message ConcateProto { - optional int32 concate_dim = 1 [default = 0]; - optional int32 num_concates = 2 [default = 1]; -} - -message ConvolutionProto { - // The number of outputs for the layer - optional int32 num_filters = 1; - // the kernel height/width - optional int32 kernel = 2 [default = 3]; - // The padding height/width - optional int32 pad = 30 [default = 0]; - // the stride - optional int32 stride = 31 [default = 1]; - - optional int32 kernel_x = 41 [default = 3]; - optional int32 kernel_y = 42 [default = 3]; - - optional int32 pad_x = 44 [default = 0]; - optional int32 pad_y = 45 [default = 0]; - - optional int32 stride_x = 47 [default = 1]; - optional int32 stride_y = 48 [default = 1]; - - // cudnn workspace size in MB - optional int32 workspace_byte_limit = 50 [default = 512]; -} - -message DataProto { - // path to the data file/folder, absolute or relative to the workspace - required string path = 2; - // batch size. - required int32 batchsize = 4; - // skip [0,random_skip] records - optional int32 random_skip = 30 [default = 0]; -} - -message MnistProto { - // normalization x/norm_a - required float norm_a = 1 [default = 1]; - // normalization x-norm_b - required float norm_b = 2 [default = 0]; - - // elastic distortion - optional int32 kernel = 30 [default = 0]; - optional float sigma = 31 [default = 0]; - optional float alpha = 32 [default = 0]; - // rotation or horizontal shearing - optional float beta = 33 [default = 0]; - // scaling - optional float gamma = 34 [default = 0]; - // scale to this size as input for deformation - optional int32 resize = 35 [default = 0] ; - optional int32 elastic_freq = 36 [default = 0]; -} - -message DummyProto { - // shape of data and grad blobs - optional bool input = 1 [default = false]; - optional bool output = 2 [default = false]; - repeated int32 shape = 3; -} - -message RNNDummyProto { - optional string dynamic_srclayer = 1; - // if shape set, random generate the data blob - repeated int32 shape = 2; - // if integer is true, generate integer data - optional bool integer = 3 [default = false]; - // range of the random generation - optional float low = 4 [default = 0]; - optional float high = 5 [default = 0]; -} - -// Message that stores parameters used by DropoutLayer -message DropoutProto { - // dropout ratio - optional float dropout_ratio = 30 [default = 0.5]; -} - -message RBMProto { - required int32 hdim = 1; // The number of outputs for the layer - optional bool bias_term = 2 [default = true]; // whether to have bias terms - optional bool gaussian = 3 [default = false]; // use gaussian sampling or not -} - -// Message that stores parameters used by GRULayer -message GRUProto { - // dimension of hidden state for the layer - required int32 dim_hidden = 1; - // use bias vector or not - optional bool bias_term = 2 [default = true]; -} - - -// Message that stores parameters used by InnerProductLayer -message InnerProductProto { - // number of outputs for the layer - required int32 num_output = 1; - // use bias vector or not - optional bool bias_term = 30 [default = true]; - // transpose or not - optional bool transpose = 31 [default = false]; -} - -message LRNProto { - // local response size - required int32 local_size = 1 [default = 5]; - // scale factor - optional float alpha = 31 [default = 1.0]; - // exponential number - optional float beta = 32 [default = 0.75]; - // offset - optional float knorm = 34 [default = 1.0]; -} - -message PoolingProto { - // The kernel size (square) - optional int32 kernel= 1 [default = 3]; - enum PoolMethod { - MAX = 0; - AVG = 1; - } - // The pooling method - optional PoolMethod pool = 30 [default = MAX]; - // The padding size - optional uint32 pad = 31 [default = 0]; - // The stride - optional uint32 stride = 32 [default = 2]; - - optional int32 kernel_x = 41 [default = 3]; - optional int32 kernel_y = 42 [default = 3]; - - optional int32 pad_x = 44 [default = 0]; - optional int32 pad_y = 45 [default = 0]; - - optional int32 stride_x = 47 [default = 2]; - optional int32 stride_y = 48 [default = 2]; -} - -message ReLUProto { - // Ref. Maas, A. L., Hannun, A. Y., & Ng, A. Y. (2013). - // Rectifier nonlinearities improve neural network acoustic models. - // In ICML Workshop on Deep Learning for Audio, Speech, and Language Processing. - optional float negative_slope = 1 [default = 0]; -} - -message SliceProto { - optional int32 slice_dim = 1 [default = 0]; - optional int32 num_slices = 2 [default = 1]; -} - -message SoftmaxProto { - // Can be used to do softmax over each channel of one image by setting it to - // be the size of the second dimension (the first dimension is batchsize). - optional int32 num_softmax_per_instance = 1 [default = 1]; -} - -message RMSPropProto { - // history=history*rho_+(1-rho_)*(grad*grad_scale); - required float rho = 1; -} -message AdaDeltaProto { - required float rho = 1 [default = 0.9]; -} -message AdamProto { - required float beta1 = 1 [default = 0.9]; - required float beta2 = 2 [default = 0.999]; -} -message AdamMaxProto { - required float beta1 = 1 [default = 0.9]; - required float beta2 = 2 [default = 0.999]; -} - -message FixedStepProto { - repeated int32 step = 28; - // lr = step_lr[i] if current step >= step[i] - repeated float step_lr = 29; -} - -message StepProto { - // lr = base_lr * gamma^(step/change_freq) - required float gamma = 35 [default = 1]; - // lr = base_lr * gamma^(step/change_freq) - required int32 change_freq = 40; -} - -message LinearProto { - // lr = (1 - step / freq) * base_lr + (step / freq) * final_lr - required int32 change_freq= 40; - // lr = (1 - step / freq) * base_lr + (step / freq) * final_lr - required float final_lr = 39; -} - -message ExponentialProto { - // lr = base / 2^(step/change_freq) - required int32 change_freq = 40; -} - -message InverseTProto { - // lr = base_lr / (1+step/final_lr) - required float final_lr = 39; -} -message InverseProto { - // lr = base_lr*(1+gamma*step)^(-pow) - required float gamma = 1 [default = 1]; - // lr = base_lr*(1+gamma*step)^(-pow) - required float pow = 2 [default = 0]; -} -message UniformProto { - optional float low = 1 [default = -1]; - optional float high = 2 [default = 1]; -} -message GaussianProto { - optional float mean = 1 [default = 0]; - optional float std = 2 [default = 1]; -} - -// -------------- -// All Enum Types -// -------------- - -enum AlgType { - // Back-propagation algorithm for feed-forward models, e.g., CNN and RNN - kBP = 1; - // Contrastive Divergence algorithm for RBM, DBM, etc. - kCD = 2; - // BPTT for training RNN models - kBPTT = 3; - // For user defined algorithm. - kUserAlg = 104; -} - -enum LayerType { - /* - * Input layers - * - Load records from file, database - */ - kCSVInput = 100; - kImagePreprocess = 101; - kRecordInput = 103; - kLMDBData = 190; // deprecated - kLabel = 191; // deprecated - kMnist = 192; // deprecated - kRGBImage = 193; // deprecated - kShardData = 194; // deprecated - kCharRNN = 195; - kRNNLabel = 196; - kOneHot = 197; - - /* - * Neuron layers - * - Feature transformation - */ - kConvolution = 201; - kCConvolution = 202; - kDropout = 203; - kDummy = 204; - kInnerProduct = 205; - kLRN = 206; - kPooling = 207; - kCPooling = 208; - kRBMHid = 209; - kRBMVis = 210; - kReLU = 211; - kSTanh = 212; - kSigmoid = 213; - kSoftmax = 214; - kGRU = 215; - kEmbedding = 216; - kActivation = 217; - kBM = 218; - - kCudnnConv = 250; - kCudnnPool = 251; - kCudnnLRN = 252; - kCudnnSoftmax = 253; - kCudnnActivation = 254; - kCudnnBM = 255; - - /* - * Loss layers - * - Compute objective loss - */ - kEuclideanLoss = 300; - kSoftmaxLoss = 301; - // cudnn v3 - kCudnnSoftmaxLoss = 350; - - /* - * Output layers - * - Write results to file, database - */ - kAccuracy = 400; - kArgSort = 401; - kCSVOutput = 402; - kRecordOutput = 403; - kCharRNNOutput = 404; - - /* - * Connection layers - * - Connect layers when neural net is partitioned - */ - kBridgeDst = 500; - kBridgeSrc = 501; - kConcate = 502; - kSlice = 503; - kSplit = 504; - kRNNDummy = 505; - - /* - * User defined layer - * - users should configure user_type - */ - kUserLayer = 600; -} - -enum UpdaterType { - // noraml SGD with momentum and weight decay - kSGD = 1; - // adaptive subgradient, http://www.magicbroom.info/Papers/DuchiHaSi10.pdf - kAdaGrad = 2; - // http://www.cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf - kRMSProp = 3; - // Nesterov first optimal gradient method - kNesterov = 4; - // AdaDelta - kAdaDelta = 5; - // Adam - kAdam = 6; - // AdamMax - kAdamMax = 7; - // For user defined updater - kUserUpdater = 105; -} - -enum Phase { - kUnknown = 0; - kTrain = 1; - kVal = 2; - kTest= 4; - // postivie phase for contrastive divergence algorithm - kPositive = 8; - // negative phase for contrastive divergence algorithm - kNegative = 16; - kForward = 32; - kBackward = 64; - kLoss = 128; - kDeploy = 256; - - // used for aggregate parameter gradients when Param is shared - kAggGrad = 512; -} - -enum ParamType { - // built-in Param - kParam = 0; - // user-defined Param - kUser = 103; -} - -enum ChangeMethod { - kFixed = 0; - kInverseT = 1; - kInverse = 2; - kExponential = 3; - kLinear = 4; - kStep = 5; - kFixedStep = 6; - // For user defiend change method - kUserChange = 100; -} - -enum InitMethod { - // fix the values of all parameters a constant in the value field - kConstant = 0; - // sample gaussian with std and mean - kGaussian = 1; - // uniform sampling between low and high - kUniform = 2; - // from Toronto Convnet, let a=1/sqrt(fan_in), w*=a after generating from - // Gaussian distribution - kGaussianSqrtFanIn = 4; - // from Toronto Convnet, rectified linear activation, let - // a=sqrt(3)/sqrt(fan_in), range is [-a, +a]; no need to set value=sqrt(3), - // the program will multiply it. - kUniformSqrtFanIn = 5; - // from Theano MLP tutorial, let a=sqrt(6/(fan_in+fan_out)). for tanh - // activation, range is [-a, +a], for sigmoid activation, range is - // [-4a, +4a], put the scale factor to value field. - // <a href="http://deeplearning.net/tutorial/mlp.html"> Theano MLP</a> - kUniformSqrtFanInOut = 6; - - // For user defined init method - kUserInit = 101; -} - -enum UnrollConnType { - // i-th unrolled layer <- (i - shift)-th src unrolled layer - kUnrollOneToOne = 1; - // i-th unrolled layer <- all src unrolled layers - kUnrollOneToAll = 2; - // i-th unrolled layer <- last unrolled src layer - kUnrollFirstToLast = 3; - // customized connection type defined by src_conn - kUnrollCustomized = 4; -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/proto/singa.proto ---------------------------------------------------------------------- diff --git a/src/proto/singa.proto b/src/proto/singa.proto deleted file mode 100644 index 2fbf2db..0000000 --- a/src/proto/singa.proto +++ /dev/null @@ -1,29 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -package singa; - -message SingaProto { - // ip/hostname:port[,ip/hostname:port] - optional string zookeeper_host = 1 [default = "localhost:2181"]; - // log dir for singa binary and job information(job id, host list, pid list) - optional string log_dir = 2 [default = "/tmp/singa-log/"]; -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/python/layer.py ---------------------------------------------------------------------- diff --git a/src/python/layer.py b/src/python/layer.py new file mode 100644 index 0000000..ec9125e --- /dev/null +++ b/src/python/layer.py @@ -0,0 +1,21 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +class Layer(Object): + + http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/python/model.py ---------------------------------------------------------------------- diff --git a/src/python/model.py b/src/python/model.py new file mode 100644 index 0000000..6d9fe39 --- /dev/null +++ b/src/python/model.py @@ -0,0 +1,21 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +class Model(Object): + + http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/server.cc ---------------------------------------------------------------------- diff --git a/src/server.cc b/src/server.cc deleted file mode 100644 index 3b72243..0000000 --- a/src/server.cc +++ /dev/null @@ -1,259 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/server.h" - -#include <thread> -#include <chrono> -#include "mshadow/tensor.h" -#include "singa/proto/common.pb.h" -#include "singa/utils/param.h" -#include "singa/utils/singleton.h" -#include "singa/utils/factory.h" -#include "singa/utils/cluster.h" - -namespace singa { - -using namespace mshadow; -using std::vector; - -Server::Server(int group_id, int server_id, - const JobProto& job_conf, - const vector<int>& slice2group, - const vector<int>& slice2server) { - grp_id_ = group_id; - id_ = server_id; - updater_ = Updater::Create(job_conf.updater()); - slice2group_ = slice2group; - slice2server_ = slice2server; - dealer_ = new Dealer(Addr(grp_id_, id_, kServer)); -} - -Server::~Server() { - delete updater_; - // free Params (i.e., slices) in server shard - for (auto entry : shard_) - for (auto param : entry.second->shares) - delete param; - delete dealer_; -} - -void Stop(void* running) { - *static_cast<bool *>(running) = false; -} - -void Server::Run() { - LOG(ERROR) << "Server (group = " << grp_id_ <<", id = " << id_ << ") start"; - auto cluster = Cluster::Get(); - if (cluster->nserver_groups()) { - CHECK_GT(slice2group_.size(), 0); - if (cluster->nservers_per_group()) { - CHECK_GT(slice2server_.size(), 0); - } - } - n_updates_.resize(slice2group_.size(), 0); - n_pending_sync_.resize(slice2group_.size(), 0); - last_sync_.resize(slice2group_.size()); - - bool running = true; - CHECK(cluster->runtime()->WatchSGroup(grp_id_, id_, Stop, &running)); - // start recv loop and process requests - while (running) { - // cannot use blocking Receive() here, it will get stuck after workers stop. - Msg* msg = dealer_->Receive(cluster->poll_time()); - if (msg == nullptr) - continue; - Msg* response = nullptr; - int type = msg->type(); - int slice_id = SliceID(msg->trgt_val()); - if (type == kPut) { - response = HandlePut(&msg); - } else if (shard_.find(slice_id) == shard_.end()) { - // TODO(wangsh): buffer the msg instead, and process it after the - // corresponding put request is done - // delay the processing by re-queue the msg. May sleep for a while? - response = msg; - } else { - switch (type) { - case kGet: - response = HandleGet(&msg); - break; - case kUpdate: - for (auto reply : HandleUpdate(&msg)) - dealer_->Send(&reply); - break; - case kSyncRequest: - response = HandleSyncRequest(&msg); - break; - case kSyncResponse: - HandleSyncResponse(&msg); - break; - default: - LOG(ERROR) << "Unknown message type: " << type; - break; - } - } - if (response != nullptr) - dealer_->Send(&response); - } - - // send stop msg to stub - Msg* msg = new Msg(Addr(grp_id_, id_, kServer), Addr(-1, -1, kStub)); - msg->set_type(kStop); - dealer_->Send(&msg); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - LOG(ERROR) << "Server (group = " << grp_id_ << ", id = " << id_ << ") stops"; -} - -Msg* Server::HandlePut(Msg **msg) { - int version = (*msg)->trgt_version(); - int slice_id = SliceID((*msg)->trgt_val()); - if (shard_.find(slice_id) != shard_.end()) - LOG(FATAL) << "Param (" << slice_id << ") is put more than once"; - - // TODO(wangwei) replace hard coded param type 0 - auto param = Singleton<Factory<Param>>::Instance()->Create(0); - auto response = param->HandlePutMsg(msg, true); - // parse num of shares of this param from a worker group - int num_shares = 1; - if ((*msg)->NextFrame()) - (*msg)->ParseFormatFrame("i", &num_shares); - DeleteMsg(msg); - shard_[slice_id] = new ParamEntry(num_shares, param); - // must set version after HandlePutMsg which allocates the memory - param->set_version(version); - param->set_last_version(version); - param->set_id(slice_id); - // allocate blob for param sync between groups. - if (slice2group_[slice_id] != grp_id_) { - last_sync_[slice_id].ReshapeLike(param->data()); - last_sync_[slice_id].CopyFrom(param->data()); - } - LOG(INFO) << "server (group = " << grp_id_ << ", id = " << id_ - <<") put slice=" << slice_id << " size=" << param->size(); - return response; -} - -Msg* Server::HandleGet(Msg **msg) { - int val = (*msg)->trgt_val(); - auto param = shard_.at(SliceID(val))->shares.at(0); - // re-queue the request if the param is not updated to the required version - if (param->version() < (*msg)->trgt_version()) { - return *msg; - } else { - // LOG(ERROR) << "get " << slice << " from "<<(*msg)->src_first(); - auto reply = param->HandleGetMsg(msg, false); - reply->set_trgt(val, param->version()); - return reply; - } -} - -const vector<Msg*> Server::HandleUpdate(Msg **msg) { - vector<Msg*> ret; - int sliceid = SliceID((*msg)->trgt_val()); - auto entry = shard_.at(sliceid); - buffer_requests_[sliceid].push_back(*msg); - int num_update; - (*msg)->LastFrame(); - (*msg)->ParseFormatFrame("i", &num_update); - (*msg)->FirstFrame(); - entry->num_update += num_update; - // LOG(ERROR) << "update "<< sliceid << " from " << AddrGrp((*msg)->src()) - // << ", " << num_update << " total " << entry->num_total; - // do update until recv gradients from all shares of this param/slice - if (entry->num_update >= entry->num_total) { - CHECK_EQ(entry->num_update, entry->num_total); - auto& request = buffer_requests_.at(sliceid); - int step = (*msg)->trgt_version(); - int trgt_val = (*msg)->trgt_val(); - auto param = entry->shares.at(0); - // extract and aggregate gradients - param->ParseUpdateMsgs(request); - // DLOG(ERROR) << "update param " << param->id() << " @ step " << step; - updater_->Update(step, param, 1.0f / entry->num_total); - param->set_version(param->version() + 1); - // response to all shares of this param - for (auto response : param->GenUpdateResponseMsgs(&request, false)) { - response->set_trgt(trgt_val, param->version()); - ret.push_back(response); - } - entry->num_update = 0; - n_updates_[sliceid]++; - // sync with master group after at least sync_freq local updates - // the last check is to avoid sending msg to stopped servers - // may send the update steps on this server since last sync, i.e., - // version-last_version - if (slice2group_[sliceid] != grp_id_ - && n_updates_[sliceid] >= Cluster::Get()->sync_freq() - && n_pending_sync_[sliceid] <= Cluster::Get()->sync_freq()) { - auto shape = Shape1(param->size()); - Tensor<cpu, 1> tmp(last_sync_[sliceid].mutable_cpu_data(), shape); - Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); - tmp = cur - tmp; - int addr = Addr(slice2group_[sliceid], slice2server_[sliceid], kServer); - Msg* sync = new Msg(Addr(grp_id_, id_, kServer), addr); - sync->set_type(kSyncRequest); - sync->set_trgt(trgt_val, param->version()); - sync->AddFrame(tmp.dptr, param->size() * sizeof(float)); - Copy(tmp, cur); - ret.push_back(sync); - n_updates_[sliceid] = 0; - n_pending_sync_[sliceid]++; - } - } - // message already pushed to buffer, just need to reset the pointer - *msg = nullptr; - return ret; -} - -Msg* Server::HandleSyncRequest(Msg **msg) { - Msg* msgg = *msg; - int slice = SliceID(msgg->trgt_val()); - auto param = shard_.at(slice)->shares.at(0); - auto shape = Shape1(param->size()); - CHECK_EQ(msgg->FrameSize(), param->size()*sizeof(float)); - Tensor<cpu, 1> inc(static_cast<float*>(msgg->FrameData()), shape); - Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); - // recv sync msg on the slice I am maintaining - cur += inc; - msgg->SwapAddr(); - msgg->set_type(kSyncResponse); - // copy the fresh param value into the response msg - Copy(inc, cur); - return msgg; -} - -// recv sync msg on slice mastered by others -void Server::HandleSyncResponse(Msg **msg) { - Msg* msgg = *msg; - int slice = SliceID(msgg->trgt_val()); - auto param = shard_.at(slice)->shares.at(0); - auto shape = Shape1(param->size()); - Tensor<cpu, 1> prev(last_sync_[param->id()].mutable_cpu_data(), shape); - Tensor<cpu, 1> cur(param->mutable_cpu_data(), shape); - Tensor<cpu, 1> master(static_cast<float*>(msgg->FrameData()), shape); - cur += master - prev; // cur = master + (cur - prev); - Copy(prev, cur); - DeleteMsg(msg); - n_pending_sync_[slice]--; -} - -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/stub.cc ---------------------------------------------------------------------- diff --git a/src/stub.cc b/src/stub.cc deleted file mode 100644 index 84c1f8b..0000000 --- a/src/stub.cc +++ /dev/null @@ -1,282 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "singa/stub.h" - -#include <glog/logging.h> -#include <unistd.h> -#include <map> -#include <thread> -#include <set> -#include "singa/proto/common.pb.h" -#include "singa/utils/cluster.h" -#include "singa/utils/common.h" -#include "singa/utils/tinydir.h" -#include "singa/utils/math_blob.h" - -namespace singa { - -using std::vector; -using std::string; - -/***********************Stub****************************/ -Stub::~Stub() { - delete router_; -} -/** - * Get a hash id for a Param object from a group. - * - * Simple multiple group_id with a large prime number 997 (assuming there are - * no more than 997 worker groups) and plus owner param id. - */ -inline int Hash(int grp_id, int param_id) { - return grp_id * 997 + param_id; -} -const std::unordered_map<int, ParamEntry*> CreateParamShard( - const vector<Worker*>& workers) { - std::unordered_map<int, ParamEntry*> shard; - // grp id -> net - std::unordered_map<int, NeuralNet*> grp2net; - // grp id -> worker id range - std::unordered_map<int, std::pair<int, int>> grp2workers; - for (auto worker : workers) { - int grp = worker->grp_id(), id = worker->id(); - if (grp2net.find(grp) == grp2net.end()) { - grp2net[grp] = worker->train_net(); - grp2workers[grp] = std::make_pair(id, id + 1); - } else { - CHECK_EQ(grp2net[grp], worker->train_net()); - int start = grp2workers[grp].first, end = grp2workers[grp].second; - if (start > id) start = id; - if (end < id + 1) end = id + 1; - grp2workers[grp] = std::make_pair(start, end); - } - } - - for (const auto entry : grp2net) { - int grp = entry.first; - int wstart = grp2workers[grp].first, wend = grp2workers[grp].second; - for (auto layer : entry.second->layers()) { - if (layer->unroll_index() > 0) - continue; - int partition = layer->partition_id(); - bool local = partition >= wstart && partition < wend; - for (auto param : layer->GetParams()) { - int hash = Hash(grp, param->owner()); - if (shard.find(hash) == shard.end()) - shard[hash] = new ParamEntry(); - shard[hash]->AddParam(local, param); - } - } - } - return shard; -} - -void Stub::Run(const vector<int>& slice2server, - const vector<Worker*>& workers, const vector<Server*>& servers) { - slice2server_ = slice2server; - int nworkers = workers.size(), nservers = servers.size(); - auto cluster = Cluster::Get(); - int procs_id = cluster->procs_id(); - LOG(INFO) << "Stub in process " << procs_id << " starts"; - auto shard = CreateParamShard(workers); - std::map<int, Dealer*> inter_dealers; // for sending msg to other procs - std::queue<Msg*> msg_queue; - while (true) { - Msg* msg = nullptr; - if (msg_queue.empty()) { - msg = router_->Receive(); - } else { - msg = msg_queue.front(); - msg_queue.pop(); - } -// LOG(ERROR) << "stub recv msg " << msg; - int type = msg->type(), dst = msg->dst(), flag = AddrType(dst); - if (flag == kStub && (AddrProc(dst) == procs_id || AddrGrp(dst) == -1)) { - // the following statements are ordered! - if (type == kConnect) { - DeleteMsg(&msg); - } else if (type == kStop) { - int src_flag = AddrType(msg->src()); - if (src_flag == kServer) nservers--; - else if (src_flag == kWorkerParam) nworkers--; - DeleteMsg(&msg); - if (nworkers == 0 && nservers == 0) break; - } else { - int grp; - int paramid = ParamID(msg->trgt_val()); - ParamEntry *entry = nullptr; - switch (type) { - case kUpdate: - grp = AddrGrp(msg->src()); - entry = shard.at(Hash(grp, paramid)); - for (auto update_msg : HandleUpdateRequest(entry, &msg)) - msg_queue.push(update_msg); - break; - case kRUpdate: - grp = AddrGrp(msg->dst()); - entry = shard.at(Hash(grp, paramid)); - HandleUpdateResponse(entry, &msg); - break; - case kGet: - grp = AddrGrp(msg->src()); - entry = shard.at(Hash(grp, paramid)); - for (auto get_msg : HandleGetRequest(entry, &msg)) - msg_queue.push(get_msg); - break; - case kRGet: - grp = AddrGrp(msg->dst()); - entry = shard.at(Hash(grp, paramid)); - HandleGetResponse(entry, &msg); - break; - case kPut: - grp = AddrGrp(msg->src()); - entry = shard.at(Hash(grp, paramid)); - for (auto put_msg : HandlePutRequest(entry, &msg)) - msg_queue.push(put_msg); - break; - default: - LOG(ERROR) << "Unknow message type:" << type; - break; - } - } - } else { - int dst_procs = AddrProc(dst); - if (flag != kStub) - dst_procs = cluster->ProcsIDOf(AddrGrp(dst), AddrID(dst), flag); - if (dst_procs != procs_id) { - if (inter_dealers.find(dst_procs) == inter_dealers.end()) - inter_dealers[dst_procs] = CreateInterProcsDealer(dst_procs); - inter_dealers[dst_procs]->Send(&msg); - } else { -// LOG(ERROR) << "router send msg " << msg; - router_->Send(&msg); - } - } - } - LOG(ERROR) << "Stub in process " << procs_id << " stops"; - for (auto& entry : inter_dealers) - delete entry.second; -} - -Dealer* Stub::CreateInterProcsDealer(int dst_procs) { - // forward to other procs - auto cluster = Cluster::Get(); - auto dealer = new Dealer(-2); - while (cluster->endpoint(dst_procs) == "") { - // kCollectSleepTime)); - std::this_thread::sleep_for(std::chrono::milliseconds(3000)); - LOG(ERROR) << "waiting for procs " << dst_procs << " to register"; - } - dealer->Connect("tcp://"+cluster->endpoint(dst_procs)); - return dealer; -} - -void Stub::GenMsgs(int type, int version, ParamEntry* entry, Msg* msg, - vector<Msg*> *ret) { - int procs_id = Cluster::Get()->procs_id(); - int src_grp = AddrGrp(msg->src()); - int dst_grp = src_grp / Cluster::Get()->nworker_groups_per_server_group(); - auto param = entry->shares.at(0); - for (int idx = 0 ; idx < param->num_slices(); idx++) { - int slice_id = param->slice_start() + idx; - int server = slice2server_[slice_id]; - int dst_procs = Cluster::Get()->ProcsIDOf(dst_grp, server, kServer); - Msg* new_msg = nullptr; - if (type == kPut) { - CHECK_GT(entry->num_total, 0); - new_msg = param->GenPutMsg(dst_procs != procs_id, idx); - new_msg->AddFormatFrame("i", entry->num_total); - } else if (type == kGet) { - new_msg = param->GenGetMsg(dst_procs != procs_id, idx); - } else if (type == kUpdate) { - new_msg = param->GenUpdateMsg(dst_procs != procs_id, idx); - new_msg->AddFormatFrame("i", entry->num_local); - } else { - LOG(FATAL) << "Wrong type"; - } - new_msg->set_trgt(ParamTrgt(param->owner(), slice_id), version); - new_msg->set_src(Addr(src_grp, procs_id, kStub)); - new_msg->set_dst(Addr(dst_grp, server, kServer)); - ret->push_back(new_msg); -// LOG(ERROR) << "stub gen msg " << new_msg; - } -} - -const vector<Msg*> Stub::HandleGetRequest(ParamEntry* entry, Msg** msg) { - vector<Msg*> ret; - int version = (*msg)->trgt_version(); - if (version > entry->next_version) { - entry->next_version = version; - GenMsgs(kGet, version, entry, *msg, &ret); - } - DeleteMsg(msg); - return ret; -} - -const vector<Msg*> Stub::HandleUpdateRequest(ParamEntry *entry, Msg** msg) { - vector<Msg*> ret; - entry->num_update++; - if (entry->num_update >= entry->num_local) { - // average local gradient - if (entry->num_local > 1) { - auto it = entry->shares.begin(); - auto sum = it; - for (++it; it != entry->shares.end(); it++) { - AXPY(1.0f, (*it)->grad(), (*sum)->mutable_grad()); - } - } - int step = (*msg)->trgt_version(); - GenMsgs(kUpdate, step, entry, *msg, &ret); - entry->num_update = 0; - } - DeleteMsg(msg); - return ret; -} - -const vector<Msg*> Stub::HandlePutRequest(ParamEntry* entry, Msg** msg) { - vector<Msg*> ret; - int version = (*msg)->trgt_version(); - GenMsgs(kPut, version, entry, *msg, &ret); - DeleteMsg(msg); - return ret; -} - -void Stub::HandleGetResponse(ParamEntry* entry, Msg** msg) { - int version = (*msg)->trgt_version(); - int sliceid = SliceID((*msg)->trgt_val()); - auto param = entry->shares.at(0); - if (param->ParseGetResponseMsg(*msg, sliceid-param->slice_start())) - for (auto *p : entry->shares) - p->set_version(version); - DeleteMsg(msg); -} - -void Stub::HandleUpdateResponse(ParamEntry* entry, Msg** msg) { - int version = (*msg)->trgt_version(); - int sliceid = SliceID((*msg)->trgt_val()); - auto param = entry->shares.at(0); - if (param->ParseUpdateResponseMsg(*msg, sliceid-param->slice_start())) - for (auto *p : entry->shares) - p->set_version(version); - DeleteMsg(msg); -} -} // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/test/test_cluster.cc ---------------------------------------------------------------------- diff --git a/src/test/test_cluster.cc b/src/test/test_cluster.cc deleted file mode 100644 index cd57991..0000000 --- a/src/test/test_cluster.cc +++ /dev/null @@ -1,143 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include "gtest/gtest.h" -#include "singa/utils/cluster.h" - -using namespace singa; - -std::string host = "localhost:2181"; - -void zk_cb(void *contest) { - LOG(INFO) << "zk callback: " << static_cast<char *>(contest); -} -/* -TEST(CluserRuntimeTest, GroupManagement) { - ClusterRuntime* rt = new ZKClusterRT(host); - ASSERT_EQ(rt->Init(), true); - ASSERT_EQ(rt->WatchSGroup(1, 1, zk_cb, "test call back"), true); - ASSERT_EQ(rt->JoinSGroup(1, 1, 1), true); - ASSERT_EQ(rt->JoinSGroup(1, 2, 1), true); - ASSERT_EQ(rt->LeaveSGroup(1, 2, 1), true); - ASSERT_EQ(rt->LeaveSGroup(1, 1, 1), true); - sleep(3); - delete rt; -} - -TEST(CluserRuntimeTest, ProcessManagement) { - ClusterRuntime* rt = new ZKClusterRT(host); - ASSERT_EQ(rt->Init(), true); - ASSERT_EQ(rt->RegistProc("1.2.3.4:5"), 0); - ASSERT_EQ(rt->RegistProc("1.2.3.4:6"), 1); - ASSERT_EQ(rt->RegistProc("1.2.3.4:7"), 2); - ASSERT_NE(rt->GetProcHost(0), ""); - ASSERT_NE(rt->GetProcHost(1), ""); - ASSERT_NE(rt->GetProcHost(2), ""); - sleep(3); - delete rt; -} - -ClusterProto GenClusterProto(){ - ClusterProto proto; - int nworker=6, nserver=4; - proto.set_nworkers(nworker); - proto.set_nservers(nserver); - proto.set_nworkers_per_group(3); - proto.set_nservers_per_group(2); - proto.set_nthreads_per_worker(1); - proto.set_nthreads_per_server(2); - - proto.set_hostfile(folder+"/hostfile"); - - std::ofstream fout(folder+"/hostfile", std::ofstream::out); - for(int i=0;i<nworker+nserver;i++){ - char tmp[20]; - sprintf(tmp, "awan-0-%02d-0", i); - fout<<tmp<<std::endl; - } - fout.flush(); - fout.close(); - return proto; -} - -TEST(ClusterTest, NoServer){ - ClusterProto proto=GenClusterProto(); - proto.set_nservers(0); - auto cluster=Cluster::Get(proto, 0); - ASSERT_EQ(proto.nworkers(),cluster->nworkers()); - ASSERT_EQ(0, cluster->nservers()); - ASSERT_EQ(proto.nworkers_per_group(),cluster->nworkers_per_group()); - ASSERT_EQ(proto.nservers_per_group(),cluster->nservers_per_group()); - ASSERT_FALSE(cluster->AmIServer()); - ASSERT_TRUE(cluster->AmIWorker()); - ASSERT_EQ(0,cluster->group_procs_id()); - ASSERT_EQ(0,cluster->group_id()); - ASSERT_EQ(2, cluster->nworker_groups()); - ASSERT_EQ(0, cluster->nserver_groups()); - ASSERT_STREQ("awan-0-00-0", cluster->host_addr().c_str()); - - cluster=Cluster::Get(proto, 5); - ASSERT_EQ(2,cluster->group_procs_id()); - ASSERT_EQ(1,cluster->group_id()); - ASSERT_EQ(2, cluster->nworker_groups()); - ASSERT_EQ(0, cluster->nserver_groups()); - ASSERT_STREQ("awan-0-05-0", cluster->host_addr().c_str()); -} - -TEST(ClusterTest, SingleServerGroup){ - ClusterProto proto=GenClusterProto(); - proto.set_nservers(2); - auto cluster=Cluster::Get(proto, 3); - ASSERT_FALSE(cluster->AmIServer()); - ASSERT_TRUE(cluster->AmIWorker()); - ASSERT_EQ(0,cluster->group_procs_id()); - ASSERT_EQ(1,cluster->group_id()); - ASSERT_EQ(2, cluster->nworker_groups()); - ASSERT_EQ(1, cluster->nserver_groups()); - ASSERT_STREQ("awan-0-03-0", cluster->host_addr().c_str()); - - cluster=Cluster::Get(proto, 7); - ASSERT_EQ(1,cluster->group_procs_id()); - ASSERT_EQ(0,cluster->group_id()); - ASSERT_EQ(2, cluster->nworker_groups()); - ASSERT_EQ(1, cluster->nserver_groups()); - ASSERT_STREQ("awan-0-07-0", cluster->host_addr().c_str()); -} - -TEST(ClusterTest, MultiServerGroups){ - ClusterProto proto=GenClusterProto(); - auto cluster=Cluster::Get(proto, 7); - ASSERT_EQ(1,cluster->group_procs_id()); - ASSERT_EQ(0,cluster->group_id()); - ASSERT_EQ(2, cluster->nworker_groups()); - ASSERT_EQ(2, cluster->nserver_groups()); - ASSERT_STREQ("awan-0-07-0", cluster->host_addr().c_str()); - - cluster=Cluster::Get(proto, 8); - ASSERT_TRUE(cluster->AmIServer()); - ASSERT_FALSE(cluster->AmIWorker()); - ASSERT_EQ(0,cluster->group_procs_id()); - ASSERT_EQ(1,cluster->group_id()); - ASSERT_EQ(2, cluster->nworker_groups()); - ASSERT_EQ(2, cluster->nserver_groups()); - ASSERT_STREQ("awan-0-08-0", cluster->host_addr().c_str()); -} -**/ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/test/test_common.cc ---------------------------------------------------------------------- diff --git a/src/test/test_common.cc b/src/test/test_common.cc deleted file mode 100644 index 4c33eb6..0000000 --- a/src/test/test_common.cc +++ /dev/null @@ -1,133 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include <string> -#include <unordered_map> -#include <vector> -#include "gtest/gtest.h" -#include "singa/utils/common.h" - -using std::string; -using std::vector; -using namespace singa; - -TEST(CommonTest, TestIntVecToString) { - vector<int> num_vec {2, 3, 5, 7, 11}; - string str = "(2, 3, 5, 7, 11, )"; - ASSERT_EQ(str, IntVecToString(num_vec)); -} - -TEST(CommonTest, TestStringPrintf) { - const char* str_a = "abc"; - const char* str_b = "edfgh"; - const char* str_c = " !@#"; - const char* str_d = "1"; - const char* str_e = "2"; - const char* str_f = "3"; - - string fmt_a = "%s%s%s"; - string fmt_b = "[%s] [%s] [%s] "; - - string str_d_a = "abcedfgh !@#"; - string str_d_b = "[1] [2] [3] "; - - ASSERT_EQ(str_d_a, StringPrintf(fmt_a, str_a, str_b, str_c)); - ASSERT_EQ(str_d_b, StringPrintf(fmt_b, str_d, str_e, str_f)); -} - -TEST(CommonTest, TestGCDLCM) { - int a = 2, b = 5, c = 10, d = 15; - - ASSERT_EQ(1, gcd(a, b)); - ASSERT_EQ(5, gcd(c, d)); - ASSERT_EQ(10, LeastCommonMultiple(b, c)); - ASSERT_EQ(30, LeastCommonMultiple(c, d)); -} - -TEST(CommonTest, TestMetric) { - string str, msg; - Metric metric; - metric.Add("a", 0.5); - metric.Add("b", 0.5); - metric.Add("a", 1.5); - str = metric.ToLogString(); - msg = metric.ToString(); - metric.Reset(); - metric.ParseFrom(msg); - ASSERT_EQ(str, metric.ToLogString()); -} - -TEST(CommonTest, TestSlice) { - vector<vector<int>> slices_0; - vector<int> sizes {14112, 96, 256, 884736, 384}; - ASSERT_EQ(slices_0, Slice(0, sizes)); - - vector<vector<int>> slices_1 { - {14112}, - {96}, - {256}, - {884736}, - {384}, - }; - - vector<vector<int>> slices_2 { - {14112}, - {96}, - {256}, - {435328, 449408}, - {384}, - }; - - vector<vector<int>> slices_4 { - {14112}, - {96}, - {256}, - {210432, 224896, 224896, 224512}, - {384}, - }; - - vector<vector<int>> slices_8 { - {14112}, - {96}, - {256}, - {97984, 112448, 112448, 112448, 112448, 112448, 112448, 112064}, - {384}, - }; - - ASSERT_EQ(slices_1, Slice(1, sizes)); - ASSERT_EQ(slices_2, Slice(2, sizes)); - ASSERT_EQ(slices_4, Slice(4, sizes)); - ASSERT_EQ(slices_8, Slice(8, sizes)); -} - -TEST(CommonTest, TestPartitionSlices) { - vector<int> slices { - 97984, 112448, 112448, 112448, 112448, 112448, 112448, 112064 - }; - vector<int> box_1 {0, 0, 0, 0, 0, 0, 0, 0}; - vector<int> box_2 {0, 0, 0, 0, 1, 1, 1, 1}; - vector<int> box_4 {0, 0, 1, 1, 2, 2, 3, 3}; - vector<int> box_8 {0, 1, 2, 3, 4, 5, 6, 7}; - ASSERT_EQ(box_1, PartitionSlices(1, slices)); - ASSERT_EQ(box_2, PartitionSlices(2, slices)); - ASSERT_EQ(box_4, PartitionSlices(4, slices)); - ASSERT_EQ(box_8, PartitionSlices(8, slices)); -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/test/test_connection_layers.cc ---------------------------------------------------------------------- diff --git a/src/test/test_connection_layers.cc b/src/test/test_connection_layers.cc deleted file mode 100644 index cd7f5f5..0000000 --- a/src/test/test_connection_layers.cc +++ /dev/null @@ -1,459 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include <string> -#include <unordered_map> -#include <vector> -#include "gtest/gtest.h" -#include "singa/comm/msg.h" -#include "singa/comm/socket.h" -#include "singa/neuralnet/connection_layer.h" -#include "singa/neuralnet/neuron_layer.h" -#include "singa/proto/job.pb.h" - -using namespace singa; - -const int N = 10; // size of dim 0 -const int M = 20; // size of dim 1 -const int K = 5; // size of partitions - -TEST(ConnectionLayerTest, DummyTest) { - // use dummy as input layer - vector<Layer*> src_in; - LayerProto proto_in; - proto_in.set_name("dummy_input"); - proto_in.mutable_dummy_conf()->set_input(true); - proto_in.mutable_dummy_conf()->add_shape(N); - proto_in.mutable_dummy_conf()->add_shape(M); - DummyLayer in; - in.Setup(proto_in, src_in); - ASSERT_EQ(in.data(nullptr).shape(0), N); - ASSERT_EQ(in.data(nullptr).shape(1), M); - in.ComputeFeature(0, src_in); - - // use dummy as neuron layer - vector<Layer*> src_neu; - src_neu.push_back(static_cast<Layer*>(&in)); - LayerProto proto_neu; - proto_neu.set_name("dummy_neuron"); - proto_neu.mutable_dummy_conf(); - DummyLayer neu; - neu.Setup(proto_neu, src_neu); - ASSERT_EQ(neu.data(nullptr).shape(0), N); - ASSERT_EQ(neu.data(nullptr).shape(1), M); - neu.ComputeFeature(0, src_neu); - ASSERT_EQ(in.data(nullptr).count(), neu.data(nullptr).count()); - for (int i = 0; i < in.data(nullptr).count(); ++i) - ASSERT_EQ(in.data(nullptr).cpu_data()[i], neu.data(nullptr).cpu_data()[i]); - - // use dummy as output layer - vector<Layer*> src_out; - src_out.push_back(static_cast<Layer*>(&neu)); - LayerProto proto_out; - proto_out.set_name("dummy_output"); - proto_out.mutable_dummy_conf()->set_output(true); - DummyLayer out; - out.Setup(proto_out, src_out); - ASSERT_EQ(out.data(nullptr).shape(0), N); - ASSERT_EQ(out.data(nullptr).shape(1), M); - out.ComputeFeature(0, src_out); - ASSERT_EQ(in.data(nullptr).count(), out.data(nullptr).count()); - for (int i = 0; i < in.data(nullptr).count(); ++i) - ASSERT_EQ(in.data(nullptr).cpu_data()[i], out.data(nullptr).cpu_data()[i]); - - // test for computing gradient - out.ComputeGradient(0, src_out); - neu.ComputeGradient(0, src_neu); - in.ComputeGradient(0, src_in); - for (int i = 0; i < in.grad(nullptr).count(); ++i) - ASSERT_EQ(in.grad(nullptr).cpu_data()[i], out.grad(nullptr).cpu_data()[i]); -} - -TEST(ConnectionLayerTest, BridgeTest) { - // use dummy as input layer - vector<Layer*> src_in; - LayerProto proto_in; - proto_in.set_name("dummy_input"); - proto_in.mutable_dummy_conf()->set_input(true); - proto_in.mutable_dummy_conf()->add_shape(N); - proto_in.mutable_dummy_conf()->add_shape(M); - DummyLayer in; - in.Setup(proto_in, src_in); - - // add src bridge layer - vector<Layer*> src_src; - src_src.push_back(static_cast<Layer*>(&in)); - LayerProto proto_src; - proto_src.set_name("bridge_src"); - BridgeSrcLayer src; - src.Setup(proto_src, src_src); - ASSERT_EQ(src.data(nullptr).shape(0), N); - ASSERT_EQ(src.data(nullptr).shape(1), M); - - // add dst bridge layer - vector<Layer*> src_dst; - src_dst.push_back(static_cast<Layer*>(&src)); - LayerProto proto_dst; - proto_dst.set_name("bridge_dst"); - BridgeDstLayer dst; - dst.Setup(proto_dst, src_dst); - ASSERT_EQ(dst.data(nullptr).shape(0), N); - ASSERT_EQ(dst.data(nullptr).shape(1), M); - - msgQueues[-1]; - msgQueues[Addr(0, 0, kWorkerLayer)]; - - // bind bridges to socket - // Router router(N); - Router router; - // router.Bind("inproc://router"); - Dealer dealer(Addr(0, 0, kWorkerLayer)); - // dealer.Connect("inproc://router"); - std::unordered_map<std::string, Layer*> name2bridge; - name2bridge[src.name()] = &src; - name2bridge[dst.name()] = &dst; - src.MakePaired(static_cast<Layer*>(&dst), 0, &dealer, &name2bridge); - dst.MakePaired(static_cast<Layer*>(&src), 0, &dealer, &name2bridge); - - // use dummy as output layer - LayerProto proto_out; - vector<Layer*> src_out; - src_out.push_back(static_cast<Layer*>(&dst)); - proto_out.set_name("dummy_output"); - proto_out.mutable_dummy_conf()->set_output(true); - DummyLayer out; - out.Setup(proto_out, src_out); - - // test for computing feature - in.ComputeFeature(0, src_in); - src.ComputeFeature(0, src_src); - Msg* msg_data = router.Receive(); - router.Send(&msg_data); - dst.ComputeFeature(0, src_dst); - out.ComputeFeature(0, src_out); - for (int i = 0; i < in.data(nullptr).count(); ++i) - ASSERT_EQ(in.data(nullptr).cpu_data()[i], out.data(nullptr).cpu_data()[i]); - - // test for computing gradient - out.ComputeGradient(0, src_out); - dst.ComputeGradient(0, src_dst); - Msg* msg_grad = router.Receive(); - router.Send(&msg_grad); - src.ComputeGradient(0, src_src); - in.ComputeGradient(0, src_in); - for (int i = 0; i < in.grad(nullptr).count(); ++i) - ASSERT_EQ(in.grad(nullptr).cpu_data()[i], out.grad(nullptr).cpu_data()[i]); -} - -TEST(ConnectionLayerTest, DataSliceTest) { - // use dummy as input layer - vector<Layer*> src_in; - LayerProto proto_in; - proto_in.set_name("dummy_input"); - proto_in.mutable_dummy_conf()->set_input(true); - proto_in.mutable_dummy_conf()->add_shape(N); - proto_in.mutable_dummy_conf()->add_shape(M); - DummyLayer in; - in.Setup(proto_in, src_in); - - // add slice layer - vector<Layer*> src_slice; - src_slice.push_back(static_cast<Layer*>(&in)); - LayerProto proto_slice; - proto_slice.set_name("slice"); - proto_slice.mutable_slice_conf()->set_slice_dim(0); - proto_slice.mutable_slice_conf()->set_num_slices(K); - SliceLayer slice; - slice.Setup(proto_slice, src_slice); - ASSERT_EQ(slice.data(nullptr).shape(0), N / K); - ASSERT_EQ(slice.data(nullptr).shape(1), M); - - // use dummy as output layers - LayerProto proto_out[K]; - vector<Layer*> src_out[K]; - DummyLayer out[K]; - for (int i = 0; i < K; ++i) { - src_out[i].push_back(static_cast<Layer*>(&slice)); - proto_out[i].set_name("dummy_output_"+std::to_string(i)); - proto_out[i].set_partition_id(i); - proto_out[i].mutable_dummy_conf()->set_output(true); - out[i].Setup(proto_out[i], src_out[i]); - } - - // test for computing feature - in.ComputeFeature(0, src_in); - slice.ComputeFeature(0, src_slice); - for (int i = 0; i < K; ++i) - out[i].ComputeFeature(0, src_out[i]); - int step = (N * M) / K; - for (int i = 0; i < in.data(nullptr).count(); ++i) { - ASSERT_EQ(in.data(nullptr).cpu_data()[i], - out[i / step].data(nullptr).cpu_data()[i % step]); - } - - // test for computing gradient - for (int i = 0; i < K; ++i) - out[i].ComputeGradient(0, src_out[i]); - slice.ComputeGradient(0, src_slice); - in.ComputeGradient(0, src_in); - for (int i = 0; i < in.grad(nullptr).count(); ++i) { - ASSERT_EQ(in.grad(nullptr).cpu_data()[i], - out[i / step].grad(nullptr).cpu_data()[i % step]); - } -} - -TEST(ConnectionLayerTest, ModelSliceTest) { - // use dummy as input layer - vector<Layer*> src_in; - LayerProto proto_in; - proto_in.set_name("dummy_input"); - proto_in.mutable_dummy_conf()->set_input(true); - proto_in.mutable_dummy_conf()->add_shape(N); - proto_in.mutable_dummy_conf()->add_shape(M); - DummyLayer in; - in.Setup(proto_in, src_in); - - // add slice layer - vector<Layer*> src_slice; - src_slice.push_back(static_cast<Layer*>(&in)); - LayerProto proto_slice; - proto_slice.set_name("slice"); - proto_slice.mutable_slice_conf()->set_slice_dim(1); - proto_slice.mutable_slice_conf()->set_num_slices(K); - SliceLayer slice; - slice.Setup(proto_slice, src_slice); - ASSERT_EQ(slice.data(nullptr).shape(0), N); - ASSERT_EQ(slice.data(nullptr).shape(1), M / K); - - // use dummy as output layers - LayerProto proto_out[K]; - vector<Layer*> src_out[K]; - DummyLayer out[K]; - for (int i = 0; i < K; ++i) { - src_out[i].push_back(static_cast<Layer*>(&slice)); - proto_out[i].set_name("dummy_output_"+std::to_string(i)); - proto_out[i].set_partition_id(i); - proto_out[i].mutable_dummy_conf()->set_output(true); - out[i].Setup(proto_out[i], src_out[i]); - } - - // test for computing feature - in.ComputeFeature(0, src_in); - slice.ComputeFeature(0, src_slice); - for (int i = 0; i < K; ++i) - out[i].ComputeFeature(0, src_out[i]); - int step = M / K; - int offset = 0; - for (int i = 0; i < in.data(nullptr).count(); ++i) { - if (i && i % M == 0) offset += step; - ASSERT_EQ(in.data(nullptr).cpu_data()[i], - out[(i / step) % K].data(nullptr).cpu_data()[offset + i % step]); - } - - // test for computing gradient - for (int i = 0; i < K; ++i) - out[i].ComputeGradient(0, src_out[i]); - slice.ComputeGradient(0, src_slice); - in.ComputeGradient(0, src_in); - offset = 0; - for (int i = 0; i < in.grad(nullptr).count(); ++i) { - if (i && i % M == 0) offset += step; - ASSERT_EQ(in.grad(nullptr).cpu_data()[i], - out[(i / step) % K].grad(nullptr).cpu_data()[offset + i % step]); - } -} - -TEST(ConnectionLayerTest, DataConcateTest) { - // use dummy as input layers - LayerProto proto_in[K]; - vector<Layer*> src_in[K]; - DummyLayer in[K]; - for (int i = 0; i < K; ++i) { - proto_in[i].set_name("dummy_input_"+std::to_string(i)); - proto_in[i].set_partition_id(i); - proto_in[i].mutable_dummy_conf()->set_input(true); - proto_in[i].mutable_dummy_conf()->add_shape(N / K); - proto_in[i].mutable_dummy_conf()->add_shape(M); - in[i].Setup(proto_in[i], src_in[i]); - } - - // add concate layer - vector<Layer*> src_concate; - for (int i = 0; i < K; ++i) - src_concate.push_back(static_cast<Layer*>(&in[i])); - LayerProto proto_concate; - proto_concate.set_name("concate"); - proto_concate.mutable_concate_conf()->set_concate_dim(0); - proto_concate.mutable_concate_conf()->set_num_concates(K); - ConcateLayer concate; - concate.Setup(proto_concate, src_concate); - ASSERT_EQ(concate.data(static_cast<Layer*>(&concate)).shape(0), N); - ASSERT_EQ(concate.data(static_cast<Layer*>(&concate)).shape(1), M); - - // use dummy as output layer - vector<Layer*> src_out; - src_out.push_back(static_cast<Layer*>(&concate)); - LayerProto proto_out; - proto_out.set_name("dummy_output"); - proto_out.mutable_dummy_conf()->set_output(true); - DummyLayer out; - out.Setup(proto_out, src_out); - - // test for computing feature - for (int i = 0; i < K; ++i) - in[i].ComputeFeature(0, src_in[i]); - concate.ComputeFeature(0, src_concate); - out.ComputeFeature(0, src_out); - int step = (N * M) / K; - for (int i = 0; i < out.data(nullptr).count(); ++i) { - ASSERT_EQ(in[i / step].data(nullptr).cpu_data()[i % step], - out.data(nullptr).cpu_data()[i]); - } - - // test for computing gradient - out.ComputeGradient(0, src_out); - concate.ComputeGradient(0, src_concate); - for (int i = 0; i < K; ++i) - in[i].ComputeGradient(0, src_in[i]); - for (int i = 0; i < out.grad(nullptr).count(); ++i) { - ASSERT_EQ(in[i / step].grad(nullptr).cpu_data()[i % step], - out.grad(nullptr).cpu_data()[i]); - } -} - -TEST(ConnectionLayerTest, ModelConcateTest) { - // use dummy as input layers - LayerProto proto_in[K]; - vector<Layer*> src_in[K]; - DummyLayer in[K]; - for (int i = 0; i < K; ++i) { - proto_in[i].set_name("dummy_input_"+std::to_string(i)); - proto_in[i].set_partition_id(i); - proto_in[i].mutable_dummy_conf()->set_input(true); - proto_in[i].mutable_dummy_conf()->add_shape(N); - proto_in[i].mutable_dummy_conf()->add_shape(M / K); - in[i].Setup(proto_in[i], src_in[i]); - } - - // add concate layer - vector<Layer*> src_concate; - for (int i = 0; i < K; ++i) - src_concate.push_back(static_cast<Layer*>(&in[i])); - LayerProto proto_concate; - proto_concate.set_name("concate"); - proto_concate.mutable_concate_conf()->set_concate_dim(1); - proto_concate.mutable_concate_conf()->set_num_concates(K); - ConcateLayer concate; - concate.Setup(proto_concate, src_concate); - ASSERT_EQ(concate.data(static_cast<Layer*>(&concate)).shape(0), N); - ASSERT_EQ(concate.data(static_cast<Layer*>(&concate)).shape(1), M); - - // use dummy as output layer - vector<Layer*> src_out; - src_out.push_back(static_cast<Layer*>(&concate)); - LayerProto proto_out; - proto_out.set_name("dummy_output"); - proto_out.mutable_dummy_conf()->set_output(true); - DummyLayer out; - out.Setup(proto_out, src_out); - - // test for computing feature - for (int i = 0; i < K; ++i) - in[i].ComputeFeature(0, src_in[i]); - concate.ComputeFeature(0, src_concate); - out.ComputeFeature(0, src_out); - int step = M / K; - int offset = 0; - for (int i = 0; i < out.grad(nullptr).count(); ++i) { - if (i && i % M == 0) offset += step; - ASSERT_EQ(in[(i / step) % K].data(nullptr).cpu_data()[offset + i % step], - out.data(nullptr).cpu_data()[i]); - } - - // test for computing gradient - out.ComputeGradient(0, src_out); - concate.ComputeGradient(0, src_concate); - for (int i = 0; i < K; ++i) - in[i].ComputeGradient(0, src_in[i]); - offset = 0; - for (int i = 0; i < out.grad(nullptr).count(); ++i) { - if (i && i % M == 0) offset += step; - ASSERT_EQ(in[(i / step) % K].grad(nullptr).cpu_data()[offset + i % step], - out.grad(nullptr).cpu_data()[i]); - } -} - -TEST(ConnectionLayerTest, SplitTest) { - // use dummy as input layer - vector<Layer*> src_in; - LayerProto proto_in; - proto_in.set_name("dummy_input"); - proto_in.mutable_dummy_conf()->set_input(true); - proto_in.mutable_dummy_conf()->add_shape(N); - proto_in.mutable_dummy_conf()->add_shape(M); - DummyLayer in; - in.Setup(proto_in, src_in); - - // add split layer - vector<Layer*> src_split; - src_split.push_back(static_cast<Layer*>(&in)); - LayerProto proto_split; - proto_split.set_name("split"); - proto_split.mutable_split_conf()->set_num_splits(K); - SplitLayer split; - split.Setup(proto_split, src_split); - ASSERT_EQ(split.data(static_cast<Layer*>(&split)).shape(0), N); - ASSERT_EQ(split.data(static_cast<Layer*>(&split)).shape(1), M); - - // use dummy as output layers - LayerProto proto_out[K]; - vector<Layer*> src_out[K]; - DummyLayer out[K]; - for (int i = 0; i < K; ++i) { - src_out[i].push_back(static_cast<Layer*>(&split)); - proto_out[i].set_name("dummy_output_"+std::to_string(i)); - proto_out[i].set_partition_id(i); - proto_out[i].mutable_dummy_conf()->set_output(true); - out[i].Setup(proto_out[i], src_out[i]); - } - - // test for computing feature - in.ComputeFeature(0, src_in); - split.ComputeFeature(0, src_split); - for (int i = 0; i < K; ++i) - out[i].ComputeFeature(0, src_out[i]); - for (int i = 0; i < in.data(nullptr).count(); ++i) { - for (int k = 0; k < K; ++k) - ASSERT_EQ(in.data(nullptr).cpu_data()[i], - out[k].data(nullptr).cpu_data()[i]); - } - - // test for computing gradient - for (int i = 0; i < K; ++i) - out[i].ComputeGradient(0, src_out[i]); - split.ComputeGradient(0, src_split); - in.ComputeGradient(0, src_in); - for (int i = 0; i < in.grad(nullptr).count(); ++i) { - float grad = 0; - for (int k = 0; k < K; ++k) grad += out[k].grad(nullptr).cpu_data()[i]; - ASSERT_EQ(in.grad(nullptr).cpu_data()[i], grad); - } -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/dd1e4afa/src/test/test_context.cc ---------------------------------------------------------------------- diff --git a/src/test/test_context.cc b/src/test/test_context.cc deleted file mode 100644 index 70f6d07..0000000 --- a/src/test/test_context.cc +++ /dev/null @@ -1,76 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ - -#include <thread> -#include "gtest/gtest.h" -#include "singa/utils/singleton.h" -#include "singa/utils/context.h" -#include "singa/utils/cuda_utils.h" - -using namespace singa; -using namespace std; - -TEST(ContextTest, TestDevice) { - auto context = Singleton<Context>::Instance(); - - auto id = std::this_thread::get_id(); - context->SetupDevice(id, 0); - auto device_id = context->device_id(id); - ASSERT_EQ(0, device_id); -} - -TEST(ContextTest, TestHandle) { - auto context = Singleton<Context>::Instance(); - - float cpu_ret = 0.0f; - float gpu_ret = 0.0f; - - float A[12]; - float B[12]; - - for (int i = 0; i < 12; i++) { - A[i] = i - 1; - B[i] = i + 1; - } - - float* A_gpu = NULL; - float* B_gpu = NULL; - context->SetupDevice(std::this_thread::get_id(), 0); - - cudaMalloc(reinterpret_cast<void**>(&A_gpu), 12 * sizeof(float)); - cudaMalloc(reinterpret_cast<void**>(&B_gpu), 12 * sizeof(float)); - - cudaMemcpy(A_gpu, A, 12 * sizeof(float), cudaMemcpyHostToDevice); - cudaMemcpy(B_gpu, B, 12 * sizeof(float), cudaMemcpyHostToDevice); - - cublasHandle_t handle = context->cublas_handle(std::this_thread::get_id()); - - cublasSdot(handle, 12, A_gpu, 1, B_gpu, 1, &gpu_ret); - - for (int i = 0; i < 12; ++i) { - cpu_ret += A[i] * B[i]; - } - - ASSERT_EQ(gpu_ret, cpu_ret); - - cudaFree(A_gpu); - cudaFree(B_gpu); -}
