SINGA-195 Channel for sending training statistics Merge branch PR#166
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/b167dfa5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/b167dfa5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/b167dfa5 Branch: refs/heads/master Commit: b167dfa5bbf6a84af87a4db0f3946659fb62c4ca Parents: 21e4b2d a4fc4ea Author: Wei Wang <[email protected]> Authored: Mon Jun 13 17:50:20 2016 +0800 Committer: Wei Wang <[email protected]> Committed: Mon Jun 13 17:53:29 2016 +0800 ---------------------------------------------------------------------- include/singa/utils/channel.h | 85 ++++++++++++++++++++++++++++++ src/utils/channel.cc | 104 +++++++++++++++++++++++++++++++++++++ test/singa/test_channel.cc | 39 ++++++++++++++ 3 files changed, 228 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b167dfa5/src/utils/channel.cc ---------------------------------------------------------------------- diff --cc src/utils/channel.cc index 0000000,95daed6..588a11a mode 000000,100644..100644 --- a/src/utils/channel.cc +++ b/src/utils/channel.cc @@@ -1,0 -1,104 +1,104 @@@ + /************************************************************ + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + *************************************************************/ + + #include "singa/utils/channel.h" + + #include "singa/utils/logging.h" + #include "singa/utils/singleton.h" + + namespace singa { + + ChannelManager::~ChannelManager() { + for (auto it : name2ptr_) { + if (it.second != nullptr) delete (it.second); + } + } + + void ChannelManager::Init() { + // do nothing here + } + + void ChannelManager::SetDefaultDir(const char* dir) { + if (dir != nullptr) { + dir_ = dir; + if (dir[dir_.length() - 1] != '/') dir_ += '/'; + } + } + + Channel* ChannelManager::GetInstance(const std::string& channel) { + // find the channel + if (name2ptr_.find(channel) == name2ptr_.end()) { + // create new channel + Channel* chn = new Channel(channel); + chn->SetDestFilePath(dir_ + channel); + chn->EnableDestFile(true); + name2ptr_[channel] = chn; + } + return name2ptr_[channel]; + } + + Channel::Channel(const std::string& name) { name_ = name; } + + Channel::~Channel() { + if (os_.is_open()) os_.close(); + } + + void Channel::SetDestFilePath(const std::string& file) { + // file is append only + if (os_.is_open()) os_.close(); + { - ifstream fin(file.c_str()); ++ std::ifstream fin(file.c_str()); + if (fin.good()) + LOG(WARNING) << "Messages will be appended to an existed file: " << file; + } + os_.open(file.c_str(), std::ios::app); + if (os_.is_open() == false) + LOG(WARNING) << "Cannot open channel file (" << file << ")"; + } + + void Channel::Send(const std::string& message) { + if (stderr_) fprintf(stderr, "%s\n", message.c_str()); + if (file_ && os_.is_open()) os_ << message << "\n"; + // TODO(wangwei) flush + } + + void Channel::Send(const google::protobuf::Message& message) { + if (stderr_) fprintf(stderr, "%s\n", message.DebugString().c_str()); + if (file_ && os_.is_open()) message.SerializeToOstream(&os_); + // TODO(wangwei) flush + } + + void InitChannel(const char* argv) { + ChannelManager* mng = Singleton<ChannelManager>().Instance(); + mng->Init(); + } + + void SetChannelDirectory(const char* path) { + ChannelManager* mng = Singleton<ChannelManager>().Instance(); + mng->SetDefaultDir(path); + } + + Channel* GetChannel(const std::string& channel_name) { + ChannelManager* mng = Singleton<ChannelManager>().Instance(); + return mng->GetInstance(channel_name); + } + + } // namespace singa
