http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/mshadow/tensor_random.h ---------------------------------------------------------------------- diff --git a/include/mshadow/tensor_random.h b/include/mshadow/tensor_random.h new file mode 100644 index 0000000..b3f0b84 --- /dev/null +++ b/include/mshadow/tensor_random.h @@ -0,0 +1,299 @@ +#ifndef MSHADOW_TENSOR_RANDOM_H +#define MSHADOW_TENSOR_RANDOM_H +/*! + * \file tensor_random.h + * \brief Random inline functions for tensor. + * \author Bing Xu, Tianqi Chen + * Based on curand|MKL|stdlib + */ +#include <cstdlib> +#include "tensor.h" +#include "tensor_container.h" + +namespace mshadow { + /*! + * \brief random number generator + * \tparam Device the device of random number generator + */ + template<typename Device> + class Random {}; + + /*! \brief CPU random number generator */ + template<> + class Random<cpu> { + public: + /*! + * \brief constructor of random engine + * \param seed random number seed + */ + Random<cpu>( int seed ){ + #if MSHADOW_USE_MKL + int status = vslNewStream(&vStream_, VSL_BRNG_MT19937, seed); + utils::Assert( status == VSL_STATUS_OK, "MKL VSL Random engine failed to be initialized.\n" ); + #else + srand(seed); + #endif + buffer_.Resize( Shape1( kRandBufferSize ) ); + } + ~Random<cpu>() { + #if MSHADOW_USE_MKL + vslDeleteStream(&vStream_); + #endif + } + /*! + * \brief seed random number generator using this seed + * \param seed seed of prng + */ + inline void Seed( int seed ){ + #if MSHADOW_USE_MKL + int status = vslDeleteStream(&vStream_); + utils::Assert(status == VSL_STATUS_OK); + status = vslNewStream(&vStream_, VSL_BRNG_MT19937, seed); + utils::Assert(status == VSL_STATUS_OK); + #else + srand( seed ); + #endif + } + /*! + * \brief generate data from uniform [a,b) + * \param dst destination + * \param a lower bound of uniform + * \param b upper bound of uniform + * \tparam dim dimension of tensor + */ + template<int dim> + inline void SampleUniform( Tensor<cpu, dim> &dst, real_t a=0.0f, real_t b=1.0f ) { + Tensor<cpu, 2> mat = dst.FlatTo2D(); + for ( index_t i = 0; i < mat.shape[1]; ++i ) { + #if MSHADOW_USE_MKL + #if MSHADOW_SINGLE_PRECISION + int status = vsRngUniform( 0, vStream_, mat.shape[0], mat[i].dptr, a, b ); + #else + int status = vdRngUniform( 0, vStream_, mat.shape[0], mat[i].dptr, a, b ); + #endif + utils::Assert(status == VSL_STATUS_OK, "Failed to generate random number by MKL.\n" ); + #else + // use stdlib + for ( index_t j = 0; j < mat.shape[0]; ++j ) { + mat[i][j] = this->RandNext()*(b-a) + a; + } + #endif + } + } + /*! + * \brief generate data from standard gaussian + * \param dst destination + * \param mu mean variable + * \param sigma standard deviation + * \tparam dim dimension of tensor + */ + template<int dim> + inline void SampleGaussian( Tensor<cpu, dim> &dst, real_t mu = 0.0f, real_t sigma = 1.0f ) { + if( sigma <= 0.0f ) { + dst = mu; return; + } + Tensor<cpu, 2> mat = dst.FlatTo2D(); + for (index_t i = 0; i < mat.shape[1]; ++i) { + #if MSHADOW_USE_MKL + #if MSHADOW_SINGLE_PRECISION + int status = vsRngGaussian( 0, vStream_, mat.shape[0], mat[i].dptr, mu, sigma ); + #else + int status = vdRngGaussian( 0, vStream_, mat.shape[0], mat[i].dptr, mu, sigma ); + #endif + utils::Assert(status == VSL_STATUS_OK, "Failed to generate random number by MKL.\n" ); + #else + real_t g1 = 0.0f, g2 = 0.0f; + for (index_t j = 0; j < mat.shape[0]; ++j) { + if( (j & 1) == 0 ){ + this->SampleNormal2D( g1, g2 ); + mat[i][j] = mu + g1 * sigma; + }else{ + mat[i][j] = mu + g2 * sigma; + } + } + #endif + } + } + /*! + * \brief return a temporal expression storing standard gaussian random variables + * the temporal tensor is only valid before next call of gaussian or uniform + * can be used as part of expression + * Caution: this means expression such as A = gaussian(s1) * gaussian(s2) will give invalid result, + * since second call of gaussian(s2) makes gaussian(s1) invalid + * A = gaussian(s1)*B+C; is correct; use one gaussian/uniform in each expression + * \param shape shape of the tensor + * \tparam dim dimension of tensor + */ + template<int dim> + inline expr::ReshapeExp<Tensor<cpu,1>,dim,1> gaussian( Shape<dim> shape ){ + buffer_.Resize( Shape1( shape.Size() ) ); + this->SampleGaussian( buffer_, 0.0f, 1.0f ); + return expr::reshape( buffer_, shape ); + } + /*! + * \brief return a temporal expression storing standard uniform [0,1) + * the temporal tensor is only valid before next call of gaussian or uniform + * can be used as part of expression + * Caution: this means expression such as A = gaussian(s1) * gaussian(s2) will give invalid result, + * since second call of gaussian(s2) makes gaussian(s1) invalid + * A = gaussian(s1)*B+C; is correct; use one gaussian/uniform in each expression + * \param shape shape of the tensor + * \tparam dim dimension of tensor + */ + template<int dim> + inline expr::ReshapeExp<Tensor<cpu,1>,dim,1> uniform( Shape<dim> shape ){ + buffer_.Resize( Shape1( shape.Size() ) ); + this->SampleUniform( buffer_, 0.0f, 1.0f ); + return expr::reshape( buffer_, shape ); + } + private: + /*! \brief get next random number from rand */ + inline real_t RandNext( void ){ + return static_cast<real_t>(rand()) / (static_cast<real_t>(RAND_MAX)+1.0f); + } + /*! \brief return a real numer uniform in (0,1) */ + inline real_t RandNext2( void ){ + return (static_cast<real_t>( rand() ) + 1.0 ) / (static_cast<real_t>(RAND_MAX) + 2.0); + } + /*! + * \brief sample iid xx,yy ~N(0,1) + * \param xx first gaussian output + * \param yy second gaussian output + */ + inline void SampleNormal2D( real_t &xx, real_t &yy ){ + real_t x,y,s; + do{ + x = 2.0f * RandNext2() - 1.0f; + y = 2.0f * RandNext2() - 1.0f; + s = x*x + y*y; + }while( s >= 1.0f || s == 0.0f ); + real_t t = std::sqrt( -2.0f * std::log( s ) / s ) ; + xx = x * t; yy = y * t; + } + private: + #if MSHADOW_USE_MKL + /*! \brief stream used by MKL VSL */ + VSLStreamStatePtr vStream_; + #endif + /*! \brief temporal space used to store random numbers */ + TensorContainer<cpu,1> buffer_; + }; // class Random<cpu> + +#ifdef __CUDACC__ + + /*! \brief GPU random number generator */ + template<> + class Random<gpu> { + public: + /*! + * \brief constructor of random engine + * \param seed random number seed + */ + Random<gpu>(int seed) { + curandStatus_t status; + status = curandCreateGenerator(&gen_, CURAND_RNG_PSEUDO_DEFAULT); + utils::Assert(status == CURAND_STATUS_SUCCESS, "Can not create CURAND Generator"); + this->Seed( seed ); + buffer_.Resize( Shape1(kRandBufferSize) ); + } + + ~Random<gpu>() { + curandStatus_t status; + status = curandDestroyGenerator(gen_); + utils::Assert(status == CURAND_STATUS_SUCCESS, "Destory CURAND Gen failed"); + } + /*! + * \brief seed random number generator using this seed + * \param seed seed of prng + */ + inline void Seed( int seed ){ + curandStatus_t status; + status = curandSetPseudoRandomGeneratorSeed(gen_, seed); + utils::Assert(status == CURAND_STATUS_SUCCESS, "Set CURAND seed failed."); + } + /*! + * \brief generate data from uniform [a,b) + * \param dst destination + * \param a lower bound of uniform + * \param b upper bound of uniform + * \tparam dim dimension of tensor + */ + template<int dim> + inline void SampleUniform(Tensor<gpu, dim> &dst, real_t a=0.0f, real_t b=1.0f) { + if( a == 0.0f && b == 1.0f ){ + dst = this->uniform( dst.shape ); + }else{ + dst = this->uniform( dst.shape ) *(b-a) + a; + } + } + /*! + * \brief generate data from standard gaussian + * \param dst destination + * \param mu mean variable + * \param sigma standard deviation + * \tparam dim dimension of tensor + */ + template<int dim> + inline void SampleGaussian(Tensor<gpu, dim> &dst, real_t mu = 0.0f, real_t sigma = 1.0f) { + dst = this->gaussian( dst.shape, mu, sigma ); + } + /*! + * \brief return a temporal expression storing standard gaussian random variables + * the temporal tensor is only valid before next call of gaussian or uniform + * can be used as part of expression + * Caution: this means expression such as A = gaussian(s1) * gaussian(s2) will give invalid result, + * since second call of gaussian(s2) makes gaussian(s1) invalid + * A = gaussian(s1)*B+C; is correct; use one gaussian/uniform in each expression + * \param shape shape of the tensor + * \param mu mean + * \param sigma variance + * \tparam dim dimension of tensor + */ + template<int dim> + inline expr::ReshapeExp<Tensor<gpu,1>,dim,1> gaussian( Shape<dim> shape, real_t mu=0.0f, real_t sigma=1.0f){ + size_t aligned_sz = ((shape.Size() + 1UL)>>1)<<1; + // allocate alligned size + buffer_.Resize( Shape1( aligned_sz ) ); + buffer_.Resize( Shape1( shape.Size() ) ); + curandStatus_t status; + #if MSHADOW_SINGLE_PRECISION + status = curandGenerateNormal(gen_, buffer_.dptr, aligned_sz , mu, sigma); + #else + status = curandGenerateNormalDouble(gen_, buffer_.dptr, buffer_.shape[0], mu, sigma); + #endif + utils::Assert(status == CURAND_STATUS_SUCCESS, "CURAND Gen Uniform failed\n"); + return expr::reshape( buffer_, shape ); + } + /*! + * \brief return a temporal expression storing standard uniform [0,1) + * the temporal tensor is only valid before next call of gaussian or uniform + * can be used as part of expression + * Caution: this means expression such as A = gaussian(s1) * gaussian(s2) will give invalid result, + * since second call of gaussian(s2) makes gaussian(s1) invalid + * A = gaussian(s1)*B+C; is correct; use one gaussian/uniform in each expression + * \param shape shape of the tensor + * \tparam dim dimension of tensor + */ + template<int dim> + inline expr::ReshapeExp<Tensor<gpu,1>,dim,1> uniform(Shape<dim> shape) { + buffer_.Resize( Shape1( shape.Size() ) ); + curandStatus_t status; + #if MSHADOW_SINGLE_PRECISION + status = curandGenerateUniform(gen_, buffer_.dptr, buffer_.shape[0] ); + #else + status = curandGenerateUniformDouble(gen_, buffer_.dptr, buffer_.shape[0] ); + #endif + utils::Assert(status == CURAND_STATUS_SUCCESS, "CURAND Gen Uniform failed\n"); + return expr::reshape( buffer_, shape ); + } + private: + /*! \brief random numbeer generator */ + curandGenerator_t gen_; + /*! \brief templ buffer */ + TensorContainer<gpu, 1> buffer_; + }; // class Random<gpu> + #endif + +}; // namespace mshadow + +#endif // MSHADOW_TENSOR_RANDOM_H
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/mshadow/tensor_sse-inl.hpp ---------------------------------------------------------------------- diff --git a/include/mshadow/tensor_sse-inl.hpp b/include/mshadow/tensor_sse-inl.hpp new file mode 100644 index 0000000..b98383e --- /dev/null +++ b/include/mshadow/tensor_sse-inl.hpp @@ -0,0 +1,431 @@ +#ifndef MSHADOW_TENSOR_SSE_INL_HPP +#define MSHADOW_TENSOR_SSE_INL_HPP +/*! + * \file tensor_sse-inl.hpp + * \brief support of sse2 optimization of some operations + * \author Tianqi Chen + */ +#ifdef __APPLE__ +#include <stdlib.h> +#else +#include <malloc.h> +#endif + +#include "tensor_expr.h" +#include "tensor.h" + +namespace mshadow { + /*! \brief namespace to support sse2 vectorization */ + namespace sse2{ + /*! + * \brief analog to cudaMallocPitch, allocate a aligned space with num_line * lspace cells + * \param pitch output parameter, the actuall space allocated for each line + * \param lspace number of cells required for each line + * \param num_line number of lines to be allocated + */ + inline void* AlignedMallocPitch( size_t &pitch, size_t lspace, size_t num_line ){ + pitch = ((lspace+15) >> 4) << 4; + #ifdef _MSC_VER + void * res = _aligned_malloc( pitch*num_line, 16 ); + #else + #ifdef __APPLE__ + void *res = malloc( pitch * num_line ); + #else + void * res = memalign( 16, pitch*num_line ); + #endif + #endif + utils::Assert( res != NULL, "AlignedMallocPitch failed" ); + return res; + } + /*! + * \brief free aligned space + * \param ptr pointer to space to be freed + */ + inline void AlignedFree( void *ptr ){ + #ifdef _MSC_VER + _aligned_free( ptr ); + #else + free( ptr ); + #endif + } + /*! \brief check if a pointer is aligned */ + inline bool CheckAlign( size_t pitch ){ + return !(pitch & ((1<<4)-1)); + } + /*! \brief check if a pointer is aligned */ + inline bool CheckAlign( void *ptr ){ + return CheckAlign( (size_t)ptr ); + } + /*! + * \brief get upper bound of aligned index of size + * \param size size of the array + * \param fsize size of float + */ + inline index_t UpperAlign( index_t size, size_t fsize ){ + return (( (size*fsize+15) >> 4 ) << 4) / fsize; + } + /*! + * \brief get lower bound of aligned index of size + * \param size size of the array + * \param fsize size of float + */ + inline index_t LowerAlign( index_t size, size_t fsize ){ + return (( (size*fsize) >> 4 ) << 4) / fsize; + } + }; // namespace sse2 +}; // namespace mshadow + +#if MSHADOW_USE_SSE +// sse types are not compatible with nvcc, only use them in cpu mode +#include <emmintrin.h> + +namespace mshadow{ + namespace sse2{ + /*! + * \brief float vector real type, used for vectorization + * \tparam FloatType double or float + */ + template<typename FloatType> struct FVec{}; + + /*! \brief vector real type for float */ + template<> + struct FVec<float> { + public: + typedef __m128 DType; + /*! \brief number of float in vector */ + const static index_t kSize = 4; + /*! \brief data content */ + DType data_; + public: + /* constructors */ + FVec( void ){} + FVec( DType data ):data_(data){} + /* set the float */ + FVec( const float &s ){ + data_ = _mm_set1_ps( s ); + } + /*!\brief load from pointer src */ + FVec( const float *src ){ + data_ = _mm_load_ps( src ); + } + public: + /*! \brief store data into dst space */ + inline void Store( float *dst ) const{ + return _mm_store_ps( dst, data_ ); + } + /*! \brief sum of all content */ + inline float Sum( void ) const{ + DType ans = _mm_add_ps( data_, _mm_movehl_ps( data_, data_ ) ); + DType rst = _mm_add_ss( ans, _mm_shuffle_ps( ans, ans, 1 ) ); + #if defined(_MSC_VER) && ( _MSC_VER <= 1500 ) && defined(_WIN64) + return rst.m128_f32[ 0 ]; + #else + float rr = _mm_cvtss_f32( rst ) ; + return rr; + #endif + } + }; + + /*! \brief vector real type for float */ + template<> + struct FVec<double> { + public: + typedef __m128d DType; + /*! \brief number of float in vector */ + const static index_t kSize = 2; + /*! \brief data content */ + DType data_; + public: + /* constructors */ + FVec( void ){} + FVec( DType data ):data_(data){} + /* set the float */ + FVec( const double &s ){ + data_ = _mm_set1_pd( s ); + } + /*!\brief load from pointer src */ + FVec( const double *src ){ + data_ = _mm_load_pd( src ); + } + public: + /*! \brief store data into dst space */ + inline void Store( double *dst ) const{ + return _mm_store_pd( dst, data_ ); + } + /*! \brief sum of all content */ + inline double Sum( void ) const{ + DType tmp = _mm_add_sd( data_, _mm_unpackhi_pd( data_,data_ ) ) ; + #if defined(_MSC_VER) && ( _MSC_VER <= 1500 ) && defined(_WIN64) + return tmp.m128d_f64[0]; + #else + double ans = _mm_cvtsd_f64( tmp ); + return ans; + #endif + } + }; + }; + + namespace sse2{ + /*! \brief sse2 operator type of certain operator */ + template<typename OP> + struct SSEOp{ + const static bool kEnabled = false; + }; + template<> + struct SSEOp<op::plus>{ + const static bool kEnabled = true; + MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &lhs, const FVec<float> &rhs ){ + return FVec<float>( _mm_add_ps( lhs.data_, rhs.data_ ) ); + } + MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &lhs, const FVec<double> &rhs ){ + return FVec<double>( _mm_add_pd( lhs.data_, rhs.data_ ) ); + } + }; + template<> + struct SSEOp<op::minus>{ + const static bool kEnabled = true; + MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &lhs, const FVec<float> &rhs ){ + return FVec<float>( _mm_sub_ps( lhs.data_, rhs.data_ ) ); + } + MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &lhs, const FVec<double> &rhs ){ + return FVec<double>( _mm_sub_pd( lhs.data_, rhs.data_ ) ); + } + }; + template<> + struct SSEOp<op::mul>{ + const static bool kEnabled = true; + MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &lhs, const FVec<float> &rhs ){ + return FVec<float>( _mm_mul_ps( lhs.data_, rhs.data_ ) ); + } + MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &lhs, const FVec<double> &rhs ){ + return FVec<double>( _mm_mul_pd( lhs.data_, rhs.data_ ) ); + } + }; + template<> + struct SSEOp<op::div>{ + const static bool kEnabled = true; + MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &lhs, const FVec<float> &rhs ){ + return FVec<float>( _mm_div_ps( lhs.data_, rhs.data_ ) ); + } + MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &lhs, const FVec<double> &rhs ){ + return FVec<double>( _mm_div_pd( lhs.data_, rhs.data_ ) ); + } + }; + + template<> + struct SSEOp<op::identity>{ + const static bool kEnabled = true; + MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &src ){ + return src; + } + MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &src ){ + return src; + } + }; + }; // namespace sse2 + + namespace sse2{ + // savers to do storage + template<typename SV, typename TFloat> + struct Saver{ + MSHADOW_CINLINE static void Save( TFloat *dst, const FVec<TFloat> &src ){ + FVec<TFloat> lhs( dst ); + FVec<TFloat> ans = SSEOp<typename SV::OPType>::Map( lhs, src ); + ans.Store( dst ); + } + }; + template<typename TFloat> + struct Saver<sv::saveto,TFloat>{ + MSHADOW_CINLINE static void Save( TFloat *dst, const FVec<TFloat> &src ){ + src.Store( dst ); + } + }; + }; // namespace sse2 +}; // namespace mshadow + +namespace mshadow{ + namespace expr{ + // same as plan, but use sse2 + template<typename ExpType> + class SSEPlan { + public: + /*! + * \brief evaluate the expression at index [y][x], x will be aligned to 4 + * to be implemented by SubType + */ + MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const; + MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const; + }; + + template <typename Device, int dim> + class SSEPlan< Tensor<Device,dim> >{ + public: + SSEPlan( const Tensor<Device,dim> &t ) + :dptr_(t.dptr),stride_(t.shape.stride_){} + MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const{ + return sse2::FVec<real_t>( &dptr_[ y*stride_+x ] ); + } + MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const{ + return dptr_[ y * stride_ + x ]; + } + private: + const real_t *dptr_; + index_t stride_; + }; + + template<> + class SSEPlan<ScalarExp>{ + public: + SSEPlan( real_t scalar ):scalar_(scalar){} + MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const{ + return sse2::FVec<real_t>( scalar_ ); + } + MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const{ + return scalar_; + } + private: + real_t scalar_; + }; + + template<typename OP, typename TA, typename TB,int etype> + class SSEPlan< BinaryMapExp<OP,TA,TB,etype> >{ + public: + SSEPlan( const SSEPlan<TA> &lhs, const SSEPlan<TB> &rhs ) + :lhs_(lhs), rhs_(rhs){} + MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const{ + return sse2::SSEOp<OP>::Map( lhs_.EvalSSE( y, x ), rhs_.EvalSSE( y, x ) ); + } + MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const{ + return OP::Map( lhs_.Eval( y, x ), rhs_.Eval( y, x ) ); + } + private: + SSEPlan<TA> lhs_; + SSEPlan<TB> rhs_; + }; + + template<typename OP, typename TA, int etype> + class SSEPlan< UnaryMapExp<OP,TA,etype> >{ + public: + SSEPlan( const SSEPlan<TA> &src ):src_(src){} + MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const{ + return sse2::SSEOp<OP>::Map( src_.EvalSSE( y, x ) ); + } + MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const{ + return OP::Map( src_.Eval( y, x ) ); + } + private: + SSEPlan<TA> src_; + }; + + template<typename OP, typename TA, typename TB, int etype> + inline SSEPlan< BinaryMapExp<OP,TA,TB,etype> > MakeSSEPlan( const BinaryMapExp<OP,TA,TB,etype> &e ); + + inline SSEPlan<ScalarExp> MakeSSEPlan( const ScalarExp &e ){ + return SSEPlan<ScalarExp>( e.scalar_ ); + } + + template<typename T> + inline SSEPlan<T> MakeSSEPlan( const ContainerExp<T> &e ){ + return SSEPlan<T>( e.self() ); + } + + template<typename T,int dim> + inline SSEPlan<T> MakeSSEPlan( const MakeTensorExp<T,cpu,dim> &e ){ + return SSEPlan<T>( e.real_self() ); + } + + template<typename OP, typename TA, int etype> + inline SSEPlan< UnaryMapExp<OP,TA,etype> > MakeSSEPlan( const UnaryMapExp<OP,TA,etype> &e ){ + return SSEPlan< UnaryMapExp<OP,TA,etype> >( MakeSSEPlan(e.src_) ); + } + + template<typename OP, typename TA, typename TB, int etype> + inline SSEPlan< BinaryMapExp<OP,TA,TB,etype> > MakeSSEPlan( const BinaryMapExp<OP,TA,TB,etype> &e ){ + return SSEPlan< BinaryMapExp<OP,TA,TB,etype> >( MakeSSEPlan(e.lhs_), MakeSSEPlan(e.rhs_) ); + } + }; + + namespace expr{ + /*! + * \brief static check sse enable + * if a expression E can not be evaluated using sse, then kPass = false + * \tparam Device the type of Device + * \tparam dim dimension of the tensor + * \tparam E expression + */ + template<typename E> + struct SSECheck{ + const static bool kPass = false; + }; + template<> + struct SSECheck<ScalarExp>{ + const static bool kPass = true; + }; + template<int dim> + struct SSECheck<Tensor<cpu,dim> >{ + const static bool kPass = true; + }; + + template<typename OP, typename TA, int etype> + struct SSECheck<UnaryMapExp<OP,TA,etype> >{ + const static bool kPass = SSECheck<TA>::kPass && sse2::SSEOp<OP>::kEnabled; + }; + template<typename OP, typename TA, typename TB, int etype> + struct SSECheck< BinaryMapExp<OP,TA,TB,etype> >{ + const static bool kPass = SSECheck<TA>::kPass && SSECheck<TB>::kPass && sse2::SSEOp<OP>::kEnabled; + }; + }; // namespace expr + namespace expr{ + // check if data is aligned and allow sse operation + template<int dim,typename E> + struct SSEAlignCheck{ + inline static bool Check( const E &exp ){ + return false; + } + }; + template<int dim> + struct SSEAlignCheck< dim, ScalarExp >{ + inline static bool Check( const ScalarExp &exp ){ + return true; + } + }; + template<int dim> + struct SSEAlignCheck< dim,Tensor<cpu,dim> >{ + inline static bool Check( const Tensor<cpu,dim> &t ){ + return sse2::CheckAlign( t.dptr ) && sse2::CheckAlign( t.shape.stride_ * sizeof( real_t ) ); + } + }; + template<int dim, typename OP, typename TA, int etype> + struct SSEAlignCheck< dim, UnaryMapExp<OP,TA,etype> >{ + inline static bool Check( const UnaryMapExp<OP,TA,etype> &t ){ + return SSEAlignCheck<dim,TA>::Check( t.src_); + } + }; + template<int dim, typename OP, typename TA, typename TB, int etype> + struct SSEAlignCheck< dim, BinaryMapExp<OP,TA,TB,etype> >{ + inline static bool Check( const BinaryMapExp<OP,TA,TB,etype> &t ){ + return SSEAlignCheck<dim,TA>::Check( t.lhs_ ) && + SSEAlignCheck<dim,TB>::Check( t.rhs_ ); + } + }; + }; // namespace expr + + /*! + * \brief use SSEPlan to compute result + */ + template<typename SV, typename E, int dim> + inline void MapSSEPlan(Tensor<cpu,dim> _dst, const expr::SSEPlan<E> &plan){ + Tensor<cpu,2> dst = _dst.FlatTo2D(); + const index_t xlen = sse2::LowerAlign( dst.shape[0], sizeof(real_t) ); + for ( index_t y = 0; y < dst.shape[1]; y ++ ) { + for( index_t x = 0; x < xlen; x += sse2::FVec<real_t>::kSize ){ + sse2::Saver<SV,real_t>::Save( &dst[y][x], plan.EvalSSE( y,x ) ); + } + for( index_t x = xlen; x < dst.shape[0]; x ++ ){ + SV::Save( dst[y][x], plan.Eval(y,x) ); + } + } + } +}; // namespace mshadow +#endif // MSHADOW_USE_SSE +#endif // MSHADOW_TENSOR_SSE_INL_HPP http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/neuralnet/base_layer.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/base_layer.h b/include/neuralnet/base_layer.h new file mode 100644 index 0000000..863c223 --- /dev/null +++ b/include/neuralnet/base_layer.h @@ -0,0 +1,563 @@ +#ifndef INCLUDE_BASE_LAYER_H_ +#define INCLUDE_BASE_LAYER_H_ + +#include <vector> +#include <string> +#include <map> +#include <functional> +#include <utility> +#include <condition_variable> +#include <mutex> +#include <memory> +#include <chrono> +#include <algorithm> + +#include "proto/model.pb.h" +#include "utils/param.h" +#include "utils/common.h" +#include "utils/blob.h" + +using std::vector; +using std::shared_ptr; +using std::make_shared; +using std::string; +using std::map; + +namespace singa{ + +class Layer; +typedef shared_ptr<Layer> SLayer; +/** + * Base layer class. + * Children should implement at least Layer::Setup, Layer::ComputeFeature(), + * Layer::ComputGradient() functions for backpropagation method; + * TODO(wangwei) implement children layers to support contrastive divergence, + * The identifier of each layer is the literal string of the class name without + * the suffix "Layer", which is used in layer registration and creation. + */ +class Layer { + public: + Layer(){} + /** + * simply save the proto configuation. + * most initializations are done by Setup(). + * @param layer_proto user defined layer configuration + */ + virtual void Init(const LayerProto &proto); + /** + * copy layer configuration from the other Layer, and set the shape. + */ + void Init(const Layer& other, const vector<int>& shape); + virtual ~Layer(){} + /** + * Marshal layer properties and data into google protobuf object + * (i.e., snapshot). + * Parameters are marshalled separately into another object (i.e., model). + * @param layer_proto + * @param copyData if true marshal data of DArray + */ + virtual void ToProto(LayerProto *layer_proto, bool copyData); + /** + * Setup layer properties. + * Setup the shapes for data and parameters, also setup some properties + * based on the layer configuration and connected src layers. + * @param srclayers layers connecting to this layer + */ + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers)=0; + /** + * \copydoc Setup(const LayerProto&, const vector<SLayer>&) + */ + virtual void Setup(); + /** + * Setup the layer properties except shape. + * the shape is already set and passed in to set other properties. + * perperties are set according to shapes of itself and connected layers, and + * configuration. this should not change the current shape_( + * shape check is done outside the function). + */ + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers)=0; + /** + * \copybrief SetupAfterPartition(const LayerProto&, const vector<int> &, + * const vector<SLayer>& ). + */ + virtual void SetupAfterPartition(); + /** + * Layers that have paramters must overload this function. + * @return parameters associated with this layer + */ + virtual vector<shared_ptr<Param>> GetParams(){ + return vector<shared_ptr<Param>>(); + } + /** + * Compute features of this layer based on connected layers. + * Implement forward propagation for BP; TODO Implement both postive phase + * and negative phase for CD. + * @param srclayers layers connecting to this layer + */ + virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers)=0; + /** + * \copybrief ComputeFeature(const vector<SLayer>& srclayers) + */ + virtual void ComputeFeature(bool training); + /** + * Compute gradients for parameters and connecting layers. + * Implement backward propagation for BP; TODO Calculate gradients for + * parameters for CD. + * @param srclayers layers connecting to this layer. + */ + virtual void ComputeGradient(const vector<SLayer>& srclayers)=0; + /** + * \copybrief ComputeGradient(const vector<SLayer>& srclayers) + */ + virtual void ComputeGradient(); + /** + * decide on which dimension to do the partitioning. + * @mode kLayer, kData, kNone (no partition) + * @return the partition dimension, -1 for no partition + */ + virtual int partition_dimension() const { + int ret=0; + if(partition_type()==kLayerPartition) + ret= 1; + else if(partition_type()==kNone) + ret= -1; + return ret; + } + + /** + * return connection type between two layers. + * Currently support two connections: kOneToOne, and kOneToAll. + * kOneToOne indicates the dst neuron depends on only one neuron from src + * layer. kOneToAll indicates the dst neuron depends on all neurons from src + * layer. TODO support kOneToMany. + */ + virtual ConnectionType connection_type(int k) const { + CHECK_LT(k, srclayers_.size()); + return kOneToOne; + } + /** + * return partition type of this layer. + * E.g., kNone, kLayer or kData + */ + virtual PartitionType partition_type() const { + return layer_proto_.partition_type(); + } + /** + * location id is the execution unit (i.e., thread from the working group) ID. + */ + virtual void set_locationid(int id){ + layer_proto_.set_locationid(id); + } + virtual int locationid() const { + return layer_proto_.locationid(); + } + /** + * partition id is the ID of the layer in the original layer. + */ + virtual void set_partitionid(int id){ + layer_proto_.set_partitionid(id); + } + virtual int partitiionid() const { + return layer_proto_.partitionid(); + } + virtual void set_name(string name){ + name_=name; + layer_proto_.set_name(name); + } + virtual const string type() const { + return layer_proto_.type(); + } + /** + * Return name of this layer + */ + const std::string &name() const { + return layer_proto_.name(); + } + const vector<int>& shape(const Layer* layer=nullptr) const{ + return data(layer).shape(); + } + + /** + * @return a const ref for Blob storing neuron values of this layer for BP + */ + virtual const Blob<float>& data(const Layer* from=nullptr) const { + return data_; + } + virtual Blob<float>* mutable_data(const Layer* from=nullptr){ + return &data_; + } + + virtual const Blob<float>& grad(const Layer* from=nullptr) const { + return grad_; + } + /** + * @return a pointer to storing neuron grads of this layer for BP + */ + virtual Blob<float>* mutable_grad(const Layer* from=nullptr) { + return &grad_; + } + + /** + * return LayerS that connected to this layer + */ + virtual const vector< SLayer> srclayers() const { + return srclayers_; + } + /** + * return LayerS that this layer connected to + */ + virtual const vector<SLayer> dstlayers() const { + return dstlayers_; + } + + virtual const int srclayers_size() const { + return srclayers_.size(); + } + virtual const int dstlayers_size() const { + return dstlayers_.size(); + } + virtual void ClearDstLayers() { + dstlayers_.clear(); + } + virtual void ClearSrcLayers() { + srclayers_.clear(); + } + + virtual void AddSrcLayer(SLayer src){ + srclayers_.push_back(src); + } + virtual void AddDstLayer(SLayer dst){ + dstlayers_.push_back(dst); + } + + virtual bool is_datalayer() const { + return false; + } + virtual bool is_parserlayer() const { + return false; + } + virtual bool is_losslayer() const { + return false; + } + virtual bool is_bridgesrclayer() const { + return false; + } + virtual bool is_bridgedstlayer() const { + return false; + } +protected: + string name_; + //vector<shared_ptr<SyncedMem>> memblobs_; + Blob<float> data_, grad_; + // DArray pos_, neg_;//for CD + LayerProto layer_proto_; + vector<SLayer> srclayers_, dstlayers_; +}; + +/** + * For sending data to layer on other threads which may resident on other nodes + * due to layer/data partition. + */ +class BridgeSrcLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + virtual void SetupAfterPartition(); + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){} + + virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers); + virtual void ComputeGradient(const vector<SLayer>& srclayers); + virtual bool is_bridgesrclayer() const { + return true; + } + + virtual void set_ready(bool a) { + ready_=a; + } + virtual bool ready() const { + return ready_; + } + protected: + bool ready_; +}; +/** + * For recv data from layer on other threads which may resident on other nodes + * due to layer/data partiton + */ +class BridgeDstLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + virtual void SetupAfterPartition(); + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){} + + virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers); + virtual void ComputeGradient(const vector<SLayer>& srclayers); + virtual bool is_bridgedstlayer() const { + return true; + } + virtual void set_ready(bool a) { + ready_=a; + } + virtual bool ready() const { + return ready_; + } + protected: + bool ready_; +}; + +/** + * Concate src layers on one dimension + */ +class ConcateLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + virtual void SetupAfterPartition(); + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){} + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); +}; + + +/** + * base layer for prefetching records from local Shard, HDFS, lmdb, etc. + * cannot be partitioned, always returns kNone for partition type. + */ + +class DataLayer: public Layer{ + public: + virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers)=0; + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers)=0; + virtual bool is_datalayer() const { + return true; + } + virtual void ComputeGradient(const vector<SLayer>& srclayers){}; + virtual const vector<Record>& records() const { + return records_; + } + virtual void Setup(){ + vector<SLayer> dummy; + Setup(layer_proto_,dummy); + has_set_=true; + } + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){} + + virtual void SetupAfterPartition(){ + if(!has_set_) + Setup(); + } + virtual PartitionType partition_type () const { + return kNone; + } + + virtual int batchsize() const { + return layer_proto_.data_param().batchsize(); + } + virtual const Record& sample() const { + return sample_; + } + + virtual Blob<float>* mutable_data(const Layer* layer=nullptr) { + return nullptr; + } + virtual Blob<float>* mutable_grad(const Layer* layer=nullptr) { + return nullptr; + } + void set_prefetch(bool prefetch){ + prefetch_=prefetch; + } + + virtual void ComputeFeature(bool training) { + if(!prefetch_) + ComputeFeature(training, srclayers_); + } + + virtual void Prefetching(bool training){ + CHECK(prefetch_); + ComputeFeature(training, srclayers_); + } + + protected: + bool has_set_; + bool prefetch_; + int random_skip_, batchsize_; + Record sample_; + vector<Record> records_; +}; + +/** + * Slice this layer into multiple dst layers on one dimension + */ +class SliceLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + virtual void SetupAfterPartition(); + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){} + + + virtual const Blob<float>& data(const Layer* layer=nullptr) const; + virtual const Blob<float>& grad(const Layer* layer=nullptr) const; + virtual Blob<float>* mutable_data(const Layer* layer=nullptr); + virtual Blob<float>* mutable_grad(const Layer* layer=nullptr); + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); + + protected: + int SliceID(const Layer* layer) const; + vector<Blob<float>> datavec_, gradvec_; +}; + +/** + * Replciate this layer into multiple dst layers + */ +class SplitLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + virtual void SetupAfterPartition(); + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){} + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); +}; + +/** + * Loss layer to calculate loss and other metrics, e.g., precison. + */ +class LossLayer: public Layer{ + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers)=0; + + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers)=0; + virtual Blob<float>* mutable_grad(const Layer* layer=nullptr){ + return nullptr; + } + virtual const Blob<float>& grad(const Layer* from=nullptr) const { + CHECK(false)<<"Loss layer has not gradient blob"; + return grad_; + } + virtual bool is_losslayer() const { + return true; + } + + virtual const Blob<float>& metric() const { + return metric_; + } + protected: + Blob<float> metric_; +}; + +/** + * parse the input records into Blobs. + */ +class ParserLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers)=0; + /** + * Parse records from DataLayer into blob. + * This function is called by + * ComputeFeature(bool, const vector<SLayer>& srclayers) or Prefetch(bool). + */ + virtual void ParseRecords(bool training, const vector<Record>& records, Blob<float>* blob)=0; + virtual bool is_parserlayer() const { + return true; + } + /** + * Dummy function. ParserLayer does not compute gradients. + */ + virtual void ComputeGradient(const vector<SLayer>& srclayers){}; + virtual void Setup(){ + Setup(layer_proto_,srclayers_); + has_set_=true; + ready_=true; + prefetch_=false; + } + virtual void SetupAfterPartition(){ + if(!has_set_) + Setup(); + } + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers){} + + virtual PartitionType partition_type () const{ + return kNone; + } + virtual Blob<float>* mutable_grad(const Layer* layer=nullptr) { + return nullptr; + } + virtual const Blob<float>& grad(const Layer* from=nullptr) const { + CHECK(false)<<"Parser layer has not gradient blob"; + return grad_; + } + + virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers){ + if(!prefetch_){ + DataLayer* datalayer=static_cast<DataLayer*>(srclayers[0].get()); + ParseRecords(training, datalayer->records(), &data_); + }else{ + std::unique_lock<std::mutex> lck(mtx_); + while(!ready_) cv_.wait(lck); + data_.CopyFrom(prefetch_data_); + ready_=false; + cv_.notify_all(); + } + } + /** + * prefetching is transparent to parsing logics. + * users implement parsing logics in ParseRecords + * worker/training algorithm calls this function to do prefetching in a + * separate thread. Records are in fact parsed into prefetch_data_, and later + * copied into data_. + */ + void Prefetching(bool training){ + std::unique_lock<std::mutex> lck(mtx_); + while(ready_) cv_.wait(lck); + //data_.Swap(prefetch_data_); + DataLayer* datalayer=static_cast<DataLayer*>(srclayers_[0].get()); + ParseRecords(training, datalayer->records(), &prefetch_data_); + ready_=true; + cv_.notify_all(); + } + + /** + * must be called before calling ComputeFeature(bool) if Prefetching runs in a + * separate thread + */ + void set_prefetch(bool prefetch) { + if(prefetch){ + if(prefetch_data_.count()==0) + prefetch_data_.ReshapeLike(data_); + ready_=false; + } + prefetch_=prefetch; + } + + private: + std::mutex mtx_; + std::condition_variable cv_; + bool ready_; + bool has_set_; + bool prefetch_; + //!< prefetch_data_ is invisible to layer logics, i.e., parsing. + Blob<float> prefetch_data_; +}; +} // singa + +#endif // INCLUDE_BASE_LAYER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/neuralnet/layer.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/layer.h b/include/neuralnet/layer.h new file mode 100644 index 0000000..263d249 --- /dev/null +++ b/include/neuralnet/layer.h @@ -0,0 +1,297 @@ +#ifndef INCLUDE_NET_LAYER_H_ +#define INCLUDE_NET_LAYER_H_ + +#include <vector> +#include <string> +#include <map> +#include <functional> +#include <utility> +#include <memory> +#include <chrono> +#include <random> +#include <lmdb.h> + +#include "proto/model.pb.h" +#include "utils/data_shard.h" +#include "neuralnet/base_layer.h" + + +/** + * \file this file includes the declarations neuron layer classes that conduct + * the transformation of features. + */ +namespace singa { + +/** + * Convolution layer. + */ +class ConvolutionLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers); + + /** + * need to reset some properties (e.g., weight matrix) according to + * shapes (after partition, e.g., partition is done against channel dimension) + */ + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers); + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); + virtual vector<shared_ptr<Param>> GetParams() { + return vector<shared_ptr<Param>>{weight_, bias_}; + } + virtual ConnectionType connection_type(int k) const { + CHECK_LT(k, srclayers_.size()); + return kOneToAll; + } + protected: + int kernel_, pad_, stride_ ; + int batchsize_, channels_, height_,width_; + int col_height_, col_width_, conv_height_, conv_width_, num_filters_; + shared_ptr<Param> weight_, bias_; + Blob<float> col_data_, col_grad_; +}; + +class DropoutLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers); + + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers); + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); + protected: + // drop probability + float pdrop_; + /* record which neuron is dropped, required for back propagating gradients, + * if mask[i]=0, then the i-th neuron is dropped. + */ + Blob<float> mask_; +}; + +/** + * fully connected layer + */ +class InnerProductLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers); + + /** + * need to reset weight matrix in case of LayerPartition + */ + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers); + virtual ConnectionType connection_type(int k) const { + CHECK_LT(k, srclayers_.size()); + return kOneToAll; + } + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); + //virtual void ToProto(LayerProto *layer_proto, bool copyData); + virtual vector<shared_ptr<Param>> GetParams() { + return vector<shared_ptr<Param>>{weight_, bias_}; + } + + private: + //! dimension of the hidden layer + int hdim_; + //! dimension of the visible layer + int vdim_; + int batchsize_; + shared_ptr<Param> weight_, bias_; +}; + +class LabelLayer: public ParserLayer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + virtual void ParseRecords(bool training, const vector<Record>& records, + Blob<float>* blob); +}; + +class LRNLayer: public Layer { +/** + * Local Response Normalization edge + * b_i=a_i/x_i^beta + * x_i=knorm+alpha*\sum_{j=max(0,i-n/2}^{min(N,i+n/2}(a_j)^2 + * n is size of local response area. + * a_i, the activation (after ReLU) of a neuron convolved with the i-th kernel. + * b_i, the neuron after normalization, N is the total num of kernels + */ + + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers); + + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers); + + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); + protected: + //! shape of the bottom layer feature + int batchsize_, channels_, height_, width_; + //! size local response (neighbor) area + int lsize_; + //! hyper-parameter + float alpha_, beta_, knorm_; + Blob<float> norm_; +}; + +class MnistImageLayer: public ParserLayer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + virtual void ParseRecords(bool training, const vector<Record>& records, + Blob<float>* blob); + + protected: + // height and width of the image after deformation + // kernel size for elastic distortion + // n^2 images are processed as a batch for elastic distortion + // conv height and conv width + // gauss kernel values, displacements, column image and tmp buffer + //float* gauss_, *displacementx_, *displacementy_, *colimg_, *tmpimg_; + float gamma_, beta_, sigma_, kernel_, alpha_, norm_a_, norm_b_; + int resize_, elastic_freq_; +}; + +class PoolingLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers); + + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers); + + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); + protected: + int kernel_, pad_, stride_; + int batchsize_,channels_, height_, width_, pooled_height_, pooled_width_; + PoolingProto_PoolMethod pool_; +}; + +class ReLULayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers); + + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers); + + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); +}; + + +class SoftmaxLossLayer: public LossLayer { + /* + * connected from the label layer and the last fc layer + */ + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers); + + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers); + /** + * softmax is not recommendeded for partition because it requires the whole + * src layer for normalization. + */ + virtual PartitionType partition_type() const { + if(layer_proto_.partition_type()==kLayerPartition) + return kNone; + else + return layer_proto_.partition_type(); + } + virtual ConnectionType connection_type(int k) const { + CHECK_LT(k, srclayers_.size()); + return kOneToAll; + } + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); + private: + int batchsize_; + int dim_; + float scale_; + int topk_; +}; + +class RGBImageLayer: public ParserLayer { + public: + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + virtual void ParseRecords(bool training, const vector<Record>& records, + Blob<float>* blob); + + private: + float scale_; + int cropsize_; + bool mirror_; + Blob<float> mean_; +}; + +class ShardDataLayer: public DataLayer{ + public: + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){}; + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + private: + shared_ptr<DataShard> shard_; +}; +class LMDBDataLayer: public DataLayer{ + public: + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){}; + virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers); + void ConvertDatumToSingleLableImageRecord(const Datum& datum, + SingleLabelImageRecord* record); + + private: + MDB_env* mdb_env_; + MDB_dbi mdb_dbi_; + MDB_txn* mdb_txn_; + MDB_cursor* mdb_cursor_; + MDB_val mdb_key_, mdb_value_; +}; + +/** + * This layer apply Tan function to neuron activations. + * f(x)=A tanh(Bx) + * f'(x)=B/A (A*A-f(x)*f(x)) + */ +class TanhLayer: public Layer { + public: + virtual void Setup(const LayerProto& proto, + const vector<SLayer>& srclayers); + + virtual void SetupAfterPartition(const LayerProto& proto, + const vector<int> &shape, + const vector<SLayer>& srclayers); + + + virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers); + virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers); + private: + float outer_scale_, inner_scale_; +}; + + +} // namespace singa + +#endif // INCLUDE_NET_LAYER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/neuralnet/neuralnet.h ---------------------------------------------------------------------- diff --git a/include/neuralnet/neuralnet.h b/include/neuralnet/neuralnet.h new file mode 100644 index 0000000..586a470 --- /dev/null +++ b/include/neuralnet/neuralnet.h @@ -0,0 +1,156 @@ +#ifndef INCLUDE_NET_NET_H_ +#define INCLUDE_NET_NET_H_ + +#include <glog/logging.h> +#include <vector> +#include <map> +#include <memory> + +#include "proto/model.pb.h" +#include "neuralnet/layer.h" +#include "utils/factory.h" +#include "utils/graph.h" + +using std::vector; +using std::string; +using std::map; +using std::shared_ptr; +namespace singa { +/** + * The neural network is constructed from user configured layers through google + * protocol buffer. TODO support constructing neural network by adding layers + * explicitly. E.g., users create layers and connect them manually in the code. + * + * Some layers, e.g., SplitLayer and BridgeSrcLayer/BridgeDstLayer will be added + * implicitly to partition the neural network. + */ +class NeuralNet { + public: + /** + * Register Layers + */ + static void RegisterLayers(); + /** + * Setup the neural network for training, test or validation. + * + * Parameters for test/validation net can share those from training after + * setup (done outside of this funcion). + * + * @param np proto for the neural network. + */ + static shared_ptr<NeuralNet> SetupNeuralNet(const NetProto& np, Phase phase); + + public: + /** + * construct the net structure from protocol buffer. + */ + NeuralNet(NetProto net_proto, int group_size=1); + /** + * construct a json string representing the neuralnet graph. + * The json string can be used by other graph engine to draw a figure for + * displaying the neuralnet structure. + */ + std::string ToString(); + /** + * Print Norm1 of data and grad of each Layer and parameter. + * @param net, neural network + */ + string DebugInfo(); + + /** + * to display the adjacency layers + */ + std::string ToAdjacency(); + /** + * Add layer explicitly used in manually programming/constructing neural net. + */ + void AddLayer(const LayerProto &layer_proto){}; + /** + * Add layer explicitly used in manually programming/constructing neural net. + */ + void AddLayer(const Layer* layer){}; + /** + * share weights from other neuralnet + */ + void ShareParams(shared_ptr<NeuralNet> other,int flag); + void ToProto(NetProto *net_proto, bool copyData=false); + const std::vector<shared_ptr<Layer>>& layers() { + return layers_; + } + /** + * return ParserLayer of the neuralnet. + */ + const std::vector<ParserLayer*>& parserlayers() { + if(parserlayers_.size()==0){ + for(auto& layer: layers_) + if(layer->is_parserlayer()) + parserlayers_.push_back(static_cast<ParserLayer*>(layer.get())); + } + return parserlayers_; + } + const std::vector<LossLayer*>& losslayers() { + if(losslayers_.size()==0){ + for(auto& layer: layers_) + if(layer->is_losslayer()) + losslayers_.push_back(static_cast<LossLayer*>(layer.get())); + } + return losslayers_; + } + const std::vector<DataLayer*>& datalayers() { + if(datalayers_.size()==0){ + for(auto& layer: layers_) + if(layer->is_datalayer()) + datalayers_.push_back(static_cast<DataLayer*>(layer.get())); + } + return datalayers_; + } + const std::vector<shared_ptr<Param>> ¶ms()const { + return params_; + } + shared_ptr<Layer> name2layer(string name){ + if (name2layer_.find(name)!=name2layer_.end()) + return name2layer_[name]; + else return nullptr; + } + + shared_ptr<Param> paramid2param(int id) { + if(paramid2param_.size()==0){ + for(auto& layer: layers_){ + for(shared_ptr<Param> p: layer->GetParams()){ + paramid2param_[p->id()]=p; + } + } + } + return paramid2param_[id]; + } + + protected: + void ConstructNeuralNet(const NetProto &net_proto); + void PartitionNeuralNet(); + map<string, shared_ptr<Layer>> GetNameToLayer( + const vector<shared_ptr<Layer>>& layers); + Graph CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers, + const map<string, shared_ptr<Layer>>& name2layer); + + /** + * Partition each layer according its partition type and dimension. + * @param layers original unpartitioned layers + */ + map<string, vector<shared_ptr<Layer>>> PartitionLayers( + const vector<shared_ptr<Layer>>& layers); + + protected: + vector<shared_ptr<Layer>> layers_; + vector<ParserLayer*> parserlayers_; + vector<LossLayer*> losslayers_; + vector<DataLayer*> datalayers_; + vector<shared_ptr<Param>> params_; + map<string, shared_ptr<Layer>> name2layer_; + map<int, shared_ptr<Param>> paramid2param_; + + map<string, LayerProto> name2layerproto_; + int group_size_; + Graph graph_; +}; +} // namespace singa +#endif // INCLUDE_NET_NET_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/pm_server.h ---------------------------------------------------------------------- diff --git a/include/trainer/pm_server.h b/include/trainer/pm_server.h new file mode 100644 index 0000000..b759844 --- /dev/null +++ b/include/trainer/pm_server.h @@ -0,0 +1,91 @@ +#ifndef INCLUDE_TRAINER_PM_SERVER_H_ +#define INCLUDE_TRAINER_PM_SERVER_H_ + +#include <czmq.h> +#include <memory> +#include <vector> +#include <map> +#include <string.h> +#include "proto/model.pb.h" +#include "utils/updater.h" +#include "utils/param.h" +#include "communication/msg.h" +#include "communication/socket.h" +using std::vector; +using std::string; +using std::shared_ptr; + +namespace singa{ + +/** + * Parameter manager at the server side. + * + * Repsond to worker's get/put/udpate request, and periodically syncing with + * other servers. + * + * Normally, the PMServer creates a response message for each request which + * will be sent back to the one who issued the request. However, if the request + * are not processed successfully, the original message will be returned. The + * sever does not know the returned message (response or the original message), + * it just sends it to the router. The router will decide to re-send the + * request to the server or send it to the worker. + * + */ +class PMServer{ +public: + typedef std::map<int, shared_ptr<Param>> ParamShard; + + void Setup(int group_id, int server_id, shared_ptr<ParamShard> shard, + const UpdaterProto& proto); + + ~PMServer(); + + /** + * Process GET request. + * + * @return the orignal message or response message + */ + virtual Msg* HandleGet(Msg** msg); + + /** + * Process Update request. + * + * @return the orignal message or response message + */ + virtual Msg* HandleUpdate(Msg** msg); + + /** + * Process PUT request. + * + * @return the original message or response message. If we don't want need to + * acknowledge the put request, then return nullptr. + */ + virtual Msg* HandlePut(Msg **msg); + + /** + * TODO Process SYNC request. + */ + virtual Msg* HandleSyncRequest(Msg** msg); + + /** + * TODO Process SYNC response. + */ + virtual int HandleSyncResponse(Msg** msg); + + /** + * Scheduler for synchronizing server groups. + * + * TODO implement the Caffe's synchronization scheduler for data parallelism + */ + virtual bool SyncNow(); + + protected: + int group_id_, server_id_; + shared_ptr<ParamShard> shard_; + shared_ptr<Dealer> dealer_; + shared_ptr<Updater> updater_; +}; + +} // namespace singa + +#endif // INCLUDE_TRAINER_PM_SERVER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/pm_worker.h ---------------------------------------------------------------------- diff --git a/include/trainer/pm_worker.h b/include/trainer/pm_worker.h new file mode 100644 index 0000000..198f5bd --- /dev/null +++ b/include/trainer/pm_worker.h @@ -0,0 +1,171 @@ +#ifndef INCLUDE_TRAINER_PM_WORKER_H_ +#define INCLUDE_TRAINER_PM_WORKER_H_ + +#include <memory> +#include <vector> +#include <map> +#include <string> +#include <atomic> +#include "utils/param.h" +#include "communication/msg.h" + +using std::string; +using std::vector; +using std::shared_ptr; +using std::map; + +namespace singa { + +/** + * Counters used to construct a parameter shard. + * + * For each worker group: + * Every unique Param object is associated with a ParamCounter object whose + * param field points the to Param object itself. + * + * Param objects sharing the same values (due to data parallelism) are + * associated with the same ParamCounter whose param field also shares the + * same values. + * + * Usage: we need to aggregate gradients from all workers for the shared + * parameters before sending the update request. The nUpdate counter counts + * the number. + * + * TODO test with different physical architectures. + */ +class ParamCounter{ + public: + ParamCounter(shared_ptr<Param> p,int local, int owner): + nUpdate(0), nGet(0), nPut(0), nCollect(0), nLocal(local), nTotal(0), + owner_procs(owner), param(p){} + + /** + * Associate the counter to a Param object. + * + * @param p + * @param local 1 if this Param object is used by workers in this procs, 0 + * otherwise + * @param owner the procs id of the worker who ownes this Param object + */ + void AddParam(shared_ptr<Param> p, int local, int owner){ + nLocal+=local; + nTotal+=1; + if(owner_procs>-1) + owner_procs=owner; + if(nLocal>1){ + // TODO copy p->param; + } + } + std::atomic<int> nUpdate, nGet, nPut, nCollect; //!< all counters are atomic + + int nLocal; //!< # local workers uses the shared parameter + int nTotal; //!< # total workers uses the shared parameter + int owner_procs; //!< the procs id of the worker that owns the parameter + shared_ptr<Param> param; +}; + + +/** + * Parameter manager at the worker side. + */ +class PMWorker{ +public: + /** + * Workers from the same group resident in the same process share the same + * ParamShard which contains ParamCounters for Param objects used/updated by + * these worekrs. Shared Param objects are associated with the same + * ParamCounter. + */ + typedef std::map<int, shared_ptr<ParamCounter>> ParamShard; + + + void Setup(int group_id, int worker_id, shared_ptr<ParamShard> shard); + + void set_id(int group_id, int worker_id){ + group_id_=group_id; + worker_id_=worker_id; + } + + /** + * @return server id where the parameter is maintained. + */ + virtual int Sharding(int param_id); + + /** + * Generate a request message to Get the parameter object. + */ + virtual Msg* Get(shared_ptr<Param> param, int step); + virtual Msg* Get(Msg** msg); + + /** + * Generate a request message to Update the parameter object. + */ + virtual Msg* Update(shared_ptr<Param> param, int step); + virtual Msg* Update(Msg** msg); + + /** + * Collect a Param object returned from server. + */ + virtual Msg* Collect(Msg**); + + /** + * Generate a request message to Put the parameter object. + */ + virtual Msg* Put(shared_ptr<Param> param, int step); + virtual Msg* Put(Msg** msg); + + protected: + int group_id_, worker_id_; + shared_ptr<ParamShard> shard_; +}; + +/** + * Testing worker functionality.The main thread reads the config file and set up the socket. + * + * Create the shared ParamShard, then starts worker thread which basically carries out the work. + * Each thread creates a PMClient object. + * + * The main thread then enter the loops to forward messages. + * + * Requests from the worker thread is prepend the paramId, which is stripped by the main thread + * before forwarding to the correct server. + * + * The 1st thread in Client 0 populates the servers with data (PUT request). Wait + * for a while before starting the client thread (which does get/update + * continuously). +class SingaClient { +public: + SingaClient(int worker_id, Topology &topology, vector<string> &hosts); + void StartClient(); + + int id() { + return id_; + } + ParamShard *param_shard() { + return param_shard_; + } + char *backend_endpoint() { + return backend_endpoint_; + } + +private: + int id_, local_id_, group_id_; + char backend_endpoint_[256]; + vector<char*> neighbors_; + ParamShard *param_shard_; + + int param_to_server_id(int paramId);//< mapping paramId to server ID +}; + +//Zthread function for the worker thread, in the global namespace. +//Basically a loop of: compute, get, update, compute, etc. +void ClientThread(void *args, zctx_t *ctx, void *pipe); + +vector<Param*> gen_random_params(); +void test_get(PMClient *client); +void test_update(PMClient *client, vector<Param*> params); +void test_collect(PMClient *client); + */ + +} // namespace singa +#endif // INCLUDE_TRAINER_PM_WORKER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/server.h ---------------------------------------------------------------------- diff --git a/include/trainer/server.h b/include/trainer/server.h new file mode 100644 index 0000000..d113c7d --- /dev/null +++ b/include/trainer/server.h @@ -0,0 +1,22 @@ +#ifndef INCLUDE_TRAINER_SERVER_H_ +#define INCLUDE_TRAINER_SERVER_H_ +#include <memory> +#include "trainer/pm_server.h" +#include "communication/socket.h" + +using std::shared_ptr; +namespace singa { +class Server{ + public: + Server(int group_id, int server_id); + void Setup(const UpdaterProto& proto, shared_ptr<PMServer::ParamShard> shard, + shared_ptr<Dealer> dealer); + void Run(); + + protected: + int group_id_, server_id_; + shared_ptr<PMServer> pmserver_; + shared_ptr<Dealer> dealer_; +}; +} /* Server */ +#endif //INCLUDE_TRAINER_SERVER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/trainer.h ---------------------------------------------------------------------- diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h new file mode 100644 index 0000000..34d95f1 --- /dev/null +++ b/include/trainer/trainer.h @@ -0,0 +1,50 @@ +#ifndef INCLUDE_TRAINER_TRAINER_H_ +#define INCLUDE_TRAINER_TRAINER_H_ +#include "proto/cluster.pb.h" +#include "proto/model.pb.h" +#include "utils/updater.h" +#include "utils/param.h" +#include "utils/singleton.h" +#include "utils/factory.h" +#include "neuralnet/neuralnet.h" +#include "trainer/pm_worker.h" +#include "trainer/pm_server.h" +#include "trainer/worker.h" +#include "trainer/server.h" + +namespace singa { +/** + * Every running process has a training object which launches one or more + * worker (and server) threads. + * + * The main thread runs a loop to forward messages between workers and servers. + */ +class Trainer{ + public: + /** + * Start the training in one process + * + * @param modelproto + * @param clusterproto + */ + void Start(const ModelProto& modelproto, const ClusterProto& clusterproto, + int procs_id); + + // TODO add Resume() function to continue training from a previously stopped + // point. + + protected: + void Run(); + /** + * Register default implementations for all base classes used in the system, + * e.g., the Updater, BaseMsg, etc. + * + * All built-in layer implementations are + * registered here. + * For other base classes, use its base class name (string) as the key and the + * implementation class as the value, e.g., <"Updater" SGDUpdater>. + */ + void RegisterDefaultClasses(const singa::ModelProto& proto); +}; +} /* singa */ +#endif // INCLUDE_TRAINER_TRAINER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/worker.h ---------------------------------------------------------------------- diff --git a/include/trainer/worker.h b/include/trainer/worker.h new file mode 100644 index 0000000..609e7dc --- /dev/null +++ b/include/trainer/worker.h @@ -0,0 +1,218 @@ +#ifndef INCLUDE_TRAINER_WORKER_H_ +#define INCLUDE_TRAINER_WORKER_H_ +#include <map> +#include <exception> +#include "neuralnet/neuralnet.h" +#include "proto/model.pb.h" +#include "trainer/pm_worker.h" +#include "utils/cluster.h" +#include "communication/socket.h" +#include "communication/msg.h" + +namespace singa { +/** + * Collecting metrics, like accuracy, loss, etc. + */ +class Performance{ + public: + /** + * Collect from LossLayer of net. + */ + explicit Performance(shared_ptr<NeuralNet> net); + /** + * aggregate metrics from LossLayerS + */ + void Update(); + void Reset(); + string ToString(); + private: + vector<string> name_; + shared_ptr<NeuralNet> net_; + vector<vector<float>> metric_; + int counter_; //!< inc by 1 for every Update +}; + +/** + * The Worker class which runs the training algorithm. + * The first worker group will initialize parameters of the Net, + * and put them into the distributed memory/table. + */ +class Worker { + public: + Worker(int group_id, int worker_id); + ~Worker(){} + void Setup(const ModelProto& model, shared_ptr<NeuralNet> train_net, + shared_ptr<PMWorker::ParamShard> shard, shared_ptr<Dealer> layer_dealer, + shared_ptr<Dealer> param_dealer); + void set_test_net(shared_ptr<NeuralNet> test_net){ + test_net_=test_net; + } + void set_validation_net(shared_ptr<NeuralNet> val_net){ + validation_net_=val_net; + } + + int Put(shared_ptr<Param> param, int step); + int Get(shared_ptr<Param> param, int step); + int Update(shared_ptr<Param> param, int step); + int Collect(shared_ptr<Param> param, int step); + /** + * check validation/test firstly, then TrainOneBatch + * Performance collects performance for the whole neuralnet. + * Hence, no need to collect performance in every thread. + * Only the main thread will pass none null perf. + */ + void RunOneBatch(int step, Performance* perf=nullptr); + /** + * Train one mini-batch. + * Test/Validation is done before training. + */ + virtual void TrainOneBatch(int step)=0; + /** + * Test/validate one mini-batch. + */ + virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase phase)=0; + /** + * Test the perforance of the learned model on validation or test dataset. + * Test is done by the first group. + * @param net, neural network + * @param phase kValidation or kTest. + */ + void Test(shared_ptr<NeuralNet> net, int nsteps, bool dispperf); + + /** + * Main function of Worker. + * 1. Train the neuralnet step by step, test/validation is done periodically. + * 2. TODO Communicate with others, e.g., zookeeper, after every step. + */ + virtual void Run(); + + + /** + * Pull data from layers resident on other nodes due to Model Partition. + void Pull(zsock_t* pull, shared_ptr<NeuralNet> net); + */ + + /** + * Check is it time to display training info, e.g., loss and precison. + */ + const bool DisplayNow(const int step) const { + return (modelproto_.display_frequency() > 0 + && step >= modelproto_.display_after_steps() + && ((step - modelproto_.display_after_steps()) + % modelproto_.display_frequency() == 0)); + } + + const bool DisplayDebugInfo(const int step) const { + return DisplayNow(step)&&modelproto_.debug()&&group_id_==0; + } + + /** + * return true if the stop condition is satisfied, e.g., the maximum number + * of steps have been reached. + */ + const bool StopNow(const int step) const{ + return (step >= modelproto_.train_steps()); + } + /** + * Check is it time to do checkpoint. + * @param step the ::Train() has been called this num times. + */ + const bool CheckpointNow(const int step) const{ + return (group_id_==0 + && modelproto_.checkpoint_frequency() > 0 + && step >= modelproto_.checkpoint_after_steps() + && ((step - modelproto_.checkpoint_after_steps()) + % modelproto_.checkpoint_frequency() == 0)); + } + /** + * Check is it time to do test. + * @param step the ::Train() has been called this num times. + */ + const bool TestNow(const int step) const{ + return (group_id_==0 + && modelproto_.test_frequency() > 0 + && step >= modelproto_.test_after_steps() + && ((step - modelproto_.test_after_steps()) + % modelproto_.test_frequency() == 0)); + } + /** + * Check is it time to do validation. + * @param step the ::Train() has been called step times. + */ + const bool ValidateNow(const int step) { + return (group_id_==0 + && modelproto_.validation_frequency() > 0 + && step >= modelproto_.validation_after_steps() + && ((step - modelproto_.validation_after_steps()) + % modelproto_.validation_frequency() == 0)); + } + + + /** + * start training from scratch. + * setup training/test/validation neuralnets, then call Run(). + void Start(ModelProto model); + */ + /** + * TODO Resume from snapshot + void Resume(); + */ + void ReceiveBlobs(shared_ptr<NeuralNet> net); + void SendBlob(); + protected: + int group_id_, worker_id_; + int step_; + ModelProto modelproto_; + shared_ptr<PMWorker> pmworker_; + shared_ptr<NeuralNet> train_net_, test_net_, validation_net_; + shared_ptr<Dealer> layer_dealer_, param_dealer_; + Poller layer_poller_, param_poller_; +}; + +class WorkerException: public std::exception{ + public: + const char* what() throw(){ + return "Worker Exception"; + } +}; + + +class BPWorker: public Worker{ + public: + ~BPWorker(){} + BPWorker(int group_id, int worker_id):Worker(group_id, worker_id){} + virtual void TrainOneBatch(int step); + virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase phase); + void Forward(shared_ptr<NeuralNet> net, int step, bool training); + void Backward(shared_ptr<NeuralNet> net, int step); + /** + * Profiling the time cost of training one batch. + string TimerInfo(){ + char buf[1024]; + float ticks=ticks_*1000; + float tf=tForward_/ticks, tb=tBackward_/ticks, + td=tSyncData_/ticks, tp=tSyncParam_/ticks; + float total=tf+tb+td+tp; + sprintf(buf, + "Total\t%6.2f\tforward\t%6.2f\tbackward\t%6.2f\t" + // syncdata\t%6.2f\tsyncparam\t%6.2f\n" + , total,tf,tb); + float gensync=Param::worker_gen_sync/ticks; + float handlesync=Param::worker_handle_sync/ticks; + sprintf(buf+strlen(buf), + "worker_gen_sync\t%6.2f\tworker_handle_sync\t%6.2f\n", + gensync, handlesync); + Param::worker_gen_sync=0; + Param::worker_handle_sync=0; + tForward_=0; + tBackward_=0; + tSyncData_=0; + tSyncData_=0; + ticks_=0; + return string(buf); + } + */ +}; +} // namespace singa + +#endif // INCLUDE_TRAINER_WORKER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/blob.h ---------------------------------------------------------------------- diff --git a/include/utils/blob.h b/include/utils/blob.h new file mode 100644 index 0000000..08068eb --- /dev/null +++ b/include/utils/blob.h @@ -0,0 +1,166 @@ +/** + * The code is adapted from that of Caffe whose license is attached. + * + * COPYRIGHT + * All contributions by the University of California: + * Copyright (c) 2014, The Regents of the University of California (Regents) + * All rights reserved. + * All other contributions: + * Copyright (c) 2014, the respective contributors + * All rights reserved. + * Caffe uses a shared copyright model: each contributor holds copyright over + * their contributions to Caffe. The project versioning records all such + * contribution and copyright details. If a contributor wants to further mark + * their specific copyright on a particular contribution, they should indicate + * their copyright solely in the commit message of the change when it is + * committed. + * LICENSE + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * CONTRIBUTION AGREEMENT + * By contributing to the BVLC/caffe repository through pull-request, comment, + * or otherwise, the contributor releases their content to the + * license and copyright terms herein. + * + */ +#ifndef INCLUDE_UTILS_BLOB_ +#define INCLUDE_UTILS_BLOB_ +#include <memory> +#include <vector> +#include <glog/logging.h> +#include "proto/model.pb.h" +using std::shared_ptr; +using std::vector; + +#define NOT_IMPLEMENTED LOG(FATAL) << "Not implemented function" +inline void MallocHost(void** ptr, size_t size) { + *ptr = malloc(size); +} + +inline void FreeHost(void* ptr) { + free(ptr); +} + +/** + * @brief Manages memory allocation and synchronization between the host (CPU) + * and device (GPU). + * + * TODO(dox): more thorough description. + */ +class SyncedMemory { + public: + SyncedMemory() + : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED), + own_cpu_data_(false) {} + explicit SyncedMemory(size_t size) + : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED), + own_cpu_data_(false) {} + ~SyncedMemory(); + const void* cpu_data(); + void set_cpu_data(void* data); + const void* gpu_data(); + void* mutable_cpu_data(); + void* mutable_gpu_data(); + enum SyncedHead { UNINITIALIZED, HEAD_AT_CPU, HEAD_AT_GPU, SYNCED }; + SyncedHead head() { return head_; } + size_t size() { return size_; } + + private: + void to_cpu(); + void to_gpu(); + void* cpu_ptr_; + void* gpu_ptr_; + size_t size_; + SyncedHead head_; + bool own_cpu_data_; + +}; // class SyncedMemory + + +template <typename Dtype> +class Blob { + public: + Blob(): count_(0), capacity_(0) {} + Blob(const vector<int>&shape); + /** + * @brief Change the dimensions of the blob, allocating new memory if + * necessary. + * + * This function can be called both to create an initial allocation + * of memory, and to adjust the dimensions of a top blob during Layer::Reshape + * or Layer::Forward. When changing the size of blob, memory will only be + * reallocated if sufficient memory does not already exist, and excess memory + * will never be freed. + * + * Note that reshaping an input blob and immediately calling Net::Backward is + * an error; either Net::Forward or Net::Reshape need to be called to + * propagate the new input shape to higher layers. + */ + void Reshape(const vector<int>& shape); + void ReshapeLike(const Blob& other); + const vector<int>& shape() const{ + return shape_; + } + inline int count() const { return count_; } + /** + * @brief Copy from a source Blob. + * + * @param source the Blob to copy from + * @param reshape if false, require this Blob to be pre-shaped to the shape + * of other (and die otherwise); if true, Reshape this Blob to other's + * shape if necessary + */ + void CopyFrom(const Blob<Dtype>& source, bool reshape = false); + + inline const shared_ptr<SyncedMemory>& data() const { + CHECK(data_); + return data_; + } + + const Dtype* cpu_data() const; + void set_cpu_data(Dtype* data); + const Dtype* gpu_data() const; + Dtype* mutable_cpu_data(); + Dtype* mutable_gpu_data(); + /* + void FromProto(const BlobProto& proto); + */ + void ToProto(singa::BlobProto* proto) const; + + /// @brief Compute the sum of absolute values (L1 norm) of the data. + Dtype asum_data() const; + Dtype sum_data() const; + + /** + * @brief Set the data_ shared_ptr to point to the SyncedMemory holding the + * data_ of Blob other -- useful in Layer&s which simply perform a copy + * in their Forward pass. + * + * This deallocates the SyncedMemory holding this Blob's data_, as + * shared_ptr calls its destructor when reset with the "=" operator. + */ + void ShareData(const Blob& other); + void Swap(Blob& other); + shared_ptr<SyncedMemory> data_; + protected: + vector<int> shape_; + int count_; + int capacity_; +}; // class Blob + +#endif // INCLUDE_UTILS_BLOB_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/cluster.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster.h b/include/utils/cluster.h new file mode 100644 index 0000000..4812987 --- /dev/null +++ b/include/utils/cluster.h @@ -0,0 +1,125 @@ +#ifndef INCLUDE_UTILS_CLUSTER_H_ +#define INCLUDE_UTILS_CLUSTER_H_ +#include <glog/logging.h> +#include <string> +#include <utility> +#include <memory> +#include <vector> +#include "proto/cluster.pb.h" + +using std::shared_ptr; +using std::string; +using std::vector; + +namespace singa { + +/** + * Cluster is a singleton object, which provides cluster configuations, + * e.g., the topology of the cluster. + * All IDs start from 0. + */ +class Cluster { + public: + static shared_ptr<Cluster> Get(); + static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id); + + const int nserver_groups()const{ return cluster_.nserver_groups(); } + const int nworker_groups()const { return cluster_.nworker_groups(); } + int nworkers_per_group()const {return cluster_.nworkers_per_group();} + int nservers_per_group()const {return cluster_.nservers_per_group();} + int nworkers_per_procs()const{return cluster_.nworkers_per_procs();} + int nservers_per_procs()const{return cluster_.nservers_per_procs();} + int nworker_groups_per_server_group() const { + return cluster_.nworker_groups()/cluster_.nserver_groups(); + } + + /** + * @return true if the calling procs has server threads, otherwise false + */ + bool has_server()const { + if(server_worker_separate()){ + CHECK_LT(procs_id_, nprocs()); + return procs_id_>=nworker_procs(); + }else + return procs_id_<nserver_procs(); + } + /** + * @return true if the calling procs has worker threads. + */ + bool has_worker()const { + if(server_worker_separate()){ + return procs_id_<nworker_procs(); + }else + return procs_id_<nprocs(); + } + /** + * @return global procs id, which starts from 0. + */ + int procs_id()const {return procs_id_;} + bool server_worker_separate() const { + return cluster_.server_worker_separate(); + } + int nworker_procs() const { + return nworker_groups()*nworkers_per_group()/nworkers_per_procs(); + } + int nserver_procs() const { + return nserver_groups()*nservers_per_group()/nservers_per_procs(); + } + int nprocs() const { + return cluster_.nprocs(); + } + + const string endpoint() const { + return endpoint(procs_id()); + } + /** + * @return endpoint of the router of a procs with the specified id + */ + const string endpoint(int procs_id) const { + CHECK_LT(procs_id, nprocs()); + CHECK_GE(procs_id, 0); + return endpoints_.at(procs_id); + } + const string workspace() {return cluster_.workspace();} + const string vis_folder(){ + return cluster_.workspace()+"/visualization"; + } + const string log_folder(){ + if(cluster_.has_log_dir()){ + return cluster_.workspace()+"log"; + }else + return ""; + } + + const int stub_timeout() const { + return cluster_.stub_timeout(); + } + const int worker_timeout() const { + return cluster_.worker_timeout(); + } + const int server_timeout() const { + return cluster_.server_timeout(); + } + + /** + * bandwidth MB/s + float bandwidth() const { + return cluster_.bandwidth(); + } + */ + + private: + Cluster(const ClusterProto &cluster, int procs_id) ; + void SetupFolders(const ClusterProto &cluster); + + private: + int procs_id_; + std::vector<std::string> endpoints_; + // cluster config proto + ClusterProto cluster_; + // make this class a singlton + static shared_ptr<Cluster> instance_; +}; +} // namespace singa + +#endif // INCLUDE_UTILS_CLUSTER_H_
