houkai opened a new issue #7450:  reporting bugs: pbegin_ <= pend_. Two thread 
conflicts.
URL: https://github.com/apache/incubator-mxnet/issues/7450
 
 
   ## Environment info
   Operating System: CentOS release 6.9 (Final)
   
   Compiler: gcc version 4.8.2 (GCC)
   
   Package used (Python/R/Scala/Julia): Python
   
   MXNet version: installed from source: mxnet (0.10.1)
   
   Python version and distribution: Python 2.7.9
   
   ## Error Message:
   [17:16:29] include/dmlc/././logging.h:308: [17:16:29] src/recordio.cc:126: 
Check failed: pbegin_ <= pend_ Invalid RecordIO Format
   
   ## Minimum reproducible example
   During train parse, I config ImageRecordIter in 
'mxnet/example/image-classification/common/data.py' as below:
   ```
       train = mx.io.ImageRecordIter(
           path_imgrec         = args.data_train,
           label_width         = 1,
           mean_r              = rgb_mean[0],
           mean_g              = rgb_mean[1],
           mean_b              = rgb_mean[2],
           scale               = args.scale,
           data_name           = 'data',
           label_name          = 'softmax_label',
           data_shape          = image_shape,
           batch_size          = args.batch_size,
           rand_crop           = args.random_crop,
           max_random_scale    = args.max_random_scale,
           pad                 = args.pad_size,
           fill_value          = 127,
           min_random_scale    = args.min_random_scale,
           max_aspect_ratio    = args.max_random_aspect_ratio,
           random_h            = args.max_random_h,
           random_s            = args.max_random_s,
           random_l            = args.max_random_l,
           max_rotate_angle    = args.max_random_rotate_angle,
           max_shear_ratio     = args.max_random_shear_ratio,
           rand_mirror         = args.random_mirror,
           preprocess_threads  = args.data_nthreads,
           shuffle_chunk_size  = 64, 
           shuffle_chunk_seed  = 0,
           shuffle             = True,
           num_parts           = nworker,
           part_index          = rank)
   ```
   I want shuffle data in every epoch(use random chunk).
   But , when run fine-tune.py ,I get the error:  pbegin_ <= pend_ Invalid 
RecordIO Format .Sometimes success sometimes fails.
   ```
   python -u fine-tune2.py --gpus '0,1,2,3' \
   --data-train=/home/liubang/mxnet/tools/txu_chen_wei_ban_lian/rec/ \
   --batch-size 128 --num-classes 44 --num-examples=528170 
--lr-step-epochs='2,4,6' \
   --num-epochs=15 --image-shape='3,224,224' --pretrained-model='resnext-101' \
   --load-epoch=0 --lr=3e-5 --lr-factor=0.5 --model-prefix=dress-resnext/tt \
   --data-nthreads=8
   ```
   The parameters in the command line are irrelevant.
   
   ## Analyze
   
   1. In iter_image_recordio_2.cc, Init funciton create source_
   ```
       if (num_shuffle_parts > 1) {
         source_.reset(dmlc::InputSplitShuffle::Create(
             param_.path_imgrec.c_str(), param_.part_index,
             param_.num_parts, "recordio", num_shuffle_parts, 
param_.shuffle_chunk_seed));
       }
   ```
   this will run InputSplit::Create in io.cc(dmlc)
   ```
   #if DMLC_ENABLE_STD_THREAD
     if (spec.cache_file.length() == 0) {
       return new ThreadedInputSplit(split);
     } else {
       return new CachedInputSplit(split, spec.cache_file.c_str());
     }
   #else
     CHECK(spec.cache_file.length() == 0)
         << "to enable cached file, compile with c++11";
     return split;
   #endif
   ```
   if DMLC_ENABLE_STD_THREAD is 1, return ThreadedInputSplit. Notice: here 
create one thread read data from recordio files. In threaded_input_split.h file:
   ```
     explicit ThreadedInputSplit(InputSplitBase *base)
         : buffer_size_(InputSplitBase::kBufferSize),
           base_(base), tmp_chunk_(NULL) {
       iter_.set_max_capacity(2);
       // initalize the iterator
       iter_.Init([this](InputSplitBase::Chunk **dptr) {
           if (*dptr == NULL) {
             *dptr = new InputSplitBase::Chunk(buffer_size_);
           }
           return (*dptr)->Load(base_, buffer_size_);
         },
         [base]() { base->BeforeFirst(); });
     }
   ```
   source_ is the object which contains fs_ in InputSplitBase, fs_ is file 
pointer.
   2. In iter_image_recordio_2.cc, ImageRecordIter2 class will create another 
thread for source_ to get data.
   ```
       virtual void Init(const std::vector<std::pair<std::string, std::string> 
>& kwargs) {
         prefetch_param_.InitAllowUnknown(kwargs);
         parser_.Init(kwargs);
         // maximum prefetch threaded iter internal size
         const int kMaxPrefetchBuffer = 16;
         // init thread iter
         iter_.set_max_capacity(kMaxPrefetchBuffer);
         // init thread iter
         iter_.Init([this](DataBatch **dptr) {
             if (*dptr == nullptr) {
               *dptr = new DataBatch();
             }
             return parser_.ParseNext(*dptr);
             },
             [this]() { parser_.BeforeFirst(); });
       }
   ```
   this thread produce data and put data into buffer(16 size), next function 
will consume data from buffer.
   3. Two thread use same fs_. The first one will get 2 data(buffer size 2) and 
stop as no consumer read from its queue. but the first thread change fs_.
   If before the second thread run(read data), the first has read 2 data. Then, 
the first will not effect the second.
   However, if the first running when second thread begin, they are in 
conflict. As a result, the second thread calculate the position in file(fs_) is 
error.
   
   ## What have you tried to solve it?
   
   1. close the first thread in dmlc.Because i train in one computer and don't 
use cache_file, returns the object in advance. It's solved.
   InputSplit::Create in io.cc
   ```
   InputSplit* InputSplit::Create(const char *uri_,
                                  unsigned part,
                                  unsigned nsplit,
                                  const char *type) {
     using namespace std;
     using namespace dmlc::io;
     // allow cachefile in format path#cachefile
     io::URISpec spec(uri_, part, nsplit);
     if (!strcmp(spec.uri.c_str(), "stdin")) {
       return new SingleFileSplit(spec.uri.c_str());
     }
     CHECK(part < nsplit) << "invalid input parameter for InputSplit::Create";
     URI path(spec.uri.c_str());
     InputSplitBase *split = NULL;
     if (!strcmp(type, "text")) {
       split =  new LineSplitter(FileSystem::GetInstance(path),
                                 spec.uri.c_str(), part, nsplit);
     } else if (!strcmp(type, "recordio")) {
       split =  new RecordIOSplitter(FileSystem::GetInstance(path),
                                     spec.uri.c_str(), part, nsplit);
     } else {
       LOG(FATAL) << "unknown input split type " << type;
     }
   return split;
   /*#if DMLC_ENABLE_STD_THREAD
     if (spec.cache_file.length() == 0) {
       return new ThreadedInputSplit(split);
     } else {
       return new CachedInputSplit(split, spec.cache_file.c_str());
     }
   #else
     CHECK(spec.cache_file.length() == 0)
         << "to enable cached file, compile with c++11";
     return split;
   #endif*/
   }
   ```
   2. I don't know the impact of  DMLC_ENABLE_STD_THREAD, so keep 
DMLC_ENABLE_STD_THREAD = 1.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to