[ 
https://issues.apache.org/jira/browse/ARROW-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17658072#comment-17658072
 ] 

Rok Mihevc commented on ARROW-1039:
-----------------------------------

This issue has been migrated to [issue 
#16632|https://github.com/apache/arrow/issues/16632] on GitHub. Please see the 
[migration documentation|https://github.com/apache/arrow/issues/14542] for 
further details.

> Python: pyarrow.Filesystem.read_parquet causing error if nthreads>1
> -------------------------------------------------------------------
>
>                 Key: ARROW-1039
>                 URL: https://issues.apache.org/jira/browse/ARROW-1039
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>    Affects Versions: 0.3.0
>            Reporter: James Porritt
>            Assignee: Wes McKinney
>            Priority: Major
>             Fix For: 0.4.1
>
>
> Currently I have the code:
> {code}
> client = HdfsClient("hdfshost", 8020, "myuser", driver='libhdfs3')
> parquet_file = '/my/parquet/file'
> parquet = client.read_parquet(parquet_file, nthreads=1)
> df = parquet.to_pandas()
> {code}
> This works as expected. If I make nthreads=2 however I get:
> {noformat}
> 2017-05-16 02:59:36.414677, p116977, th140677336123136, ERROR 
> InputStreamImpl: failed to read Block: [block pool ID: 
> BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] file 
> /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet
>  from Datanode: hdfshost1(1.2.3.4), 
> RemoteBlockReader.cpp: 304: ChecksumException: RemoteBlockReader: checksum 
> not match for Block: [block pool ID: BP-1695827161-10.87.14.23-1472240973777 
> block ID 1080474497_6733673], on Datanode: hdfshost1(1.2.3.4)
>         @       Hdfs::Internal::RemoteBlockReader::verifyChecksum(int)
>         @       Hdfs::Internal::RemoteBlockReader::readNextPacket()
>         @       Hdfs::Internal::RemoteBlockReader::read(char*, int)
>         @       Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, 
> bool)
>         @       Hdfs::Internal::InputStreamImpl::readInternal(char*, int)
>         @       Hdfs::Internal::InputStreamImpl::read(char*, int)
>         @       hdfsRead
>         @       arrow::io::HdfsReadableFile::ReadAt(long, long, 
> std::shared_ptr<arrow::Buffer>*)
>         @       parquet::ArrowInputFile::ReadAt(long, long)
>         @       
> parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*,
>  long, long)
>         @       parquet::SerializedRowGroup::GetColumnPageReader(int)
>         @       parquet::RowGroupReader::Column(int)
>         @       parquet::arrow::AllRowGroupsIterator::Next()
>         @       parquet::arrow::ColumnReader::Impl::NextRowGroup()
>         @       parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, 
> std::unique_ptr<parquet::arrow::FileColumnIterator, 
> std::default_delete<parquet::arrow::FileColumnIterator> >)
>         @       parquet::arrow::FileReader::Impl::GetColumn(int, 
> std::unique_ptr<parquet::arrow::ColumnReader, 
> std::default_delete<parquet::arrow::ColumnReader> >*)
>         @       parquet::arrow::FileReader::Impl::ReadColumn(int, 
> std::shared_ptr<arrow::Array>*)
>         @       parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, 
> std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const
>         @       std::thread::_Impl<std::_Bind_simple<arrow::Status 
> parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int,
>  std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, 
> parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, 
> std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> 
> >::_M_run()
>         @       Unknown
>         @       start_thread
>         @       clone
> , retry read again from another Datanode.
> 2017-05-16 02:59:36.414858, p116977, th140677336123136, INFO 
> IntputStreamImpl: Add invalid datanode hdfshost1(1.2.3.4) to failed datanodes 
> and try another datanode again for file 
> /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet.
> 2017-05-16 02:59:36.424118, p116977, th140677702768384, ERROR 
> InputStreamImpl: failed to read Block: [block pool ID: 
> BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] file 
> /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet
>  from Datanode: hdfshost2(1.2.3.5), 
> RemoteBlockReader.cpp: 205: HdfsIOException: RemoteBlockReader: failed to 
> read block header for Block: [block pool ID: 
> BP-1695827161-10.87.14.23-1472240973777 block ID 1080474497_6733673] from 
> Datanode: hdfshost1(1.2.3.4) .
>         @       Hdfs::Internal::RemoteBlockReader::readPacketHeader()
>         @       Hdfs::Internal::RemoteBlockReader::readNextPacket()
>         @       Hdfs::Internal::RemoteBlockReader::read(char*, int)
>         @       Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, 
> bool)
>         @       Hdfs::Internal::InputStreamImpl::readInternal(char*, int)
>         @       Hdfs::Internal::InputStreamImpl::read(char*, int)
>         @       hdfsRead
>         @       arrow::io::HdfsReadableFile::ReadAt(long, long, 
> std::shared_ptr<arrow::Buffer>*)
>         @       parquet::ArrowInputFile::ReadAt(long, long)
>         @       
> parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*,
>  long, long)
>         @       parquet::SerializedRowGroup::GetColumnPageReader(int)
>         @       parquet::RowGroupReader::Column(int)
>         @       parquet::arrow::AllRowGroupsIterator::Next()
>         @       parquet::arrow::ColumnReader::Impl::NextRowGroup()
>         @       parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, 
> std::unique_ptr<parquet::arrow::FileColumnIterator, 
> std::default_delete<parquet::arrow::FileColumnIterator> >)
>         @       parquet::arrow::FileReader::Impl::GetColumn(int, 
> std::unique_ptr<parquet::arrow::ColumnReader, 
> std::default_delete<parquet::arrow::ColumnReader> >*)
>         @       parquet::arrow::FileReader::Impl::ReadColumn(int, 
> std::shared_ptr<arrow::Array>*)
>         @       parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, 
> std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const
>         @       std::thread::_Impl<std::_Bind_simple<arrow::Status 
> parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int,
>  std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, 
> parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, 
> std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> 
> >::_M_run()
>         @       Unknown
>         @       start_thread
>         @       clone
> Caused by
> PacketHeader.cpp: 109: HdfsIOException: Invalid PacketHeader, packetLen is 
> 1228668939, protoLen is 1290, buf size is 31
>         @       Hdfs::Internal::PacketHeader::readFields(char const*, 
> unsigned long)
>         @       Hdfs::Internal::RemoteBlockReader::readPacketHeader()
>         @       Hdfs::Internal::RemoteBlockReader::readNextPacket()
>         @       Hdfs::Internal::RemoteBlockReader::read(char*, int)
>         @       Hdfs::Internal::InputStreamImpl::readOneBlock(char*, int, 
> bool)
>         @       Hdfs::Internal::InputStreamImpl::readInternal(char*, int)
>         @       Hdfs::Internal::InputStreamImpl::read(char*, int)
>         @       hdfsRead
>         @       arrow::io::HdfsReadableFile::ReadAt(long, long, 
> std::shared_ptr<arrow::Buffer>*)
>         @       parquet::ArrowInputFile::ReadAt(long, long)
>         @       
> parquet::InMemoryInputStream::InMemoryInputStream(parquet::RandomAccessSource*,
>  long, long)
>         @       parquet::SerializedRowGroup::GetColumnPageReader(int)
>         @       parquet::RowGroupReader::Column(int)
>         @       parquet::arrow::AllRowGroupsIterator::Next()
>         @       parquet::arrow::ColumnReader::Impl::NextRowGroup()
>         @       parquet::arrow::ColumnReader::Impl::Impl(arrow::MemoryPool*, 
> std::unique_ptr<parquet::arrow::FileColumnIterator, 
> std::default_delete<parquet::arrow::FileColumnIterator> >)
>         @       parquet::arrow::FileReader::Impl::GetColumn(int, 
> std::unique_ptr<parquet::arrow::ColumnReader, 
> std::default_delete<parquet::arrow::ColumnReader> >*)
>         @       parquet::arrow::FileReader::Impl::ReadColumn(int, 
> std::shared_ptr<arrow::Array>*)
>         @       parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, 
> std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}::operator()(int) const
>         @       std::thread::_Impl<std::_Bind_simple<arrow::Status 
> parquet::arrow::ParallelFor<parquet::arrow::FileReader::Impl::ReadTable(std::vector<int,
>  std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&>(int, int, 
> parquet::arrow::FileReader::Impl::ReadTable(std::vector<int, 
> std::allocator<int> > const&, 
> std::shared_ptr<arrow::Table>*)::{lambda(int)#1}&)::{lambda()#1} ()> 
> >::_M_run()
>         @       Unknown
>         @       start_thread
>         @       clone
> , retry read again from another Datanode.
> 2017-05-16 02:59:36.424266, p116977, th140677702768384, INFO 
> IntputStreamImpl: Add invalid datanode hdfshost2(1.2.3.5) to failed datanodes 
> and try another datanode again for file 
> /my/parquet/file/part-00001-e32bec64-fdcc-4c42-a292-c837a081310b.snappy.parquet.
> terminate called after throwing an instance of 'parquet::ParquetException'
>   what():  Unable to read column chunk data
> Aborted
> {noformat}
> If nthreads>2 I get:
> {noformat}
> Segmentation fault
> {noformat}
> I'm running this in a conda environment with:
> {noformat}
> pyarrow                   0.3.0.post          np112py27_1    conda-forge
> parquet-cpp               1.1.0pre                      2    conda-forge
> arrow-cpp                 0.3.0.post          np112py27_1    conda-forge
> libhdfs3                  2.2.31                        1  
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to