wangxujin1221 opened a new issue #3921:
URL: https://github.com/apache/iceberg/issues/3921


   We have an iceberg table partioned by time field and has 200 k records. 
   
   the table infomation:
   ```
   # Detailed Table Information          
   Database:            default                  
   OwnerType:           USER                     
   Owner:               wangxujin                
   CreateTime:          Wed Jan 19 13:31:20 CST 2022     
   LastAccessTime:      Fri Jan 23 05:47:18 CST 1970     
   Retention:           2147483647               
   Location:            
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition
  
   Table Type:          EXTERNAL_TABLE           
   Table Parameters:             
        EXTERNAL                TRUE                
        engine.hive.enabled     true                
        metadata_location       
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition/metadata/00010-5cd4b7b9-350d-4341-b129-3921cd17d204.metadata.json
        numFiles                73520               
        numRows                 200000              
        previous_metadata_location      
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition/metadata/00009-bed22c7b-1fd3-44f7-a66f-5bc7542bcd3d.metadata.json
        storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
        table_type              ICEBERG             
        totalSize               2856930656          
        transient_lastDdlTime   1642570280          
                 
   # Storage Information                 
   SerDe Library:       org.apache.iceberg.mr.hive.HiveIcebergSerDe      
   InputFormat:         org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
   OutputFormat:        org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
   Compressed:          No                       
   Num Buckets:         0                        
   Bucket Columns:      []                       
   Sort Columns:        []
   ```
   This table was write by below code:
   ```
   // this udf is used to convert a timestamp to month
   sparkSession.udf().register(PARTITION_EXPR_FUNC_NAME,new 
IcebergPartitionFunc("month"),DataTypes.IntegerType);
   dataset = df.repartition(200,expr).sortWithinPartitions(partitionField);
   dataset.write()
                               .options(options)
                               .format(WRITE_FORMAT)
                               .mode(SaveMode.Append)
                               .save(null != namespace ? namespace + "." + 
tableName : tableName);
   //udf
   static class IcebergPartitionFunc implements UDF1<Date,Integer>{
       private final String partitionTimeUnit;
       public IcebergPartitionFunc(String partitionTimeUnit){
           this.partitionTimeUnit = partitionTimeUnit;
       }
       @Override
       public Integer call(Date time){
           long l = TimeUnit.MILLISECONDS.toMicros(time.getTime());
           switch(partitionTimeUnit.toLowerCase()){
               case "year":
                   return Timestamps.YEAR.apply(l);
               case "month":
                   return Timestamps.MONTH.apply(l);
               case "day":
                   return Timestamps.DAY.apply(l);
               case "hour":
                   return Timestamps.HOUR.apply(l);
               default:
                   throw new IllegalArgumentException("Not support transform 
"+partitionTimeUnit.toLowerCase()+" to integer now.");
           }
       }
   }
   ```
   
   
   When i tranverse the table like below, it always failed.
   ```
   Dataset<Row> dataset = 
connection.client.read().format(icebeg").load("default.test");
           dataset.foreachPartition(par ->{
               while(par.hasNext()){  // this line always failed with an error
                   try{
                       OUTPUT_LIST.put(par.next());
                   }catch(InterruptedException e){
                       logger.error("iceberg records caching failed in 
LinkedBlockingQueue.",e);
                   }
               }
           });
   ```
   The error log :
   ```
   ERROR - org.apache.iceberg.spark.source.BaseDataReader - Error reading file: 
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition/data/timing_day=2012-07-10/00173-1316-8f19d27e-56a2-48c1-87a5-a9c0e234dbea-00177.parquet
   java.lang.NullPointerException
        at java.util.LinkedList$ListItr.next(LinkedList.java:893)
        at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.trimIdleSelectors(SocketIOWithTimeout.java:447)
        at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.get(SocketIOWithTimeout.java:417)
        at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:325)
        at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:256)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:207)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
        at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
        at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:165)
        at 
org.apache.hadoop.hdfs.ByteBufferStrategy.readFromBlock(ReaderStrategy.java:180)
        at 
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:705)
        at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:766)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:836)
        at 
org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:147)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:81)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:90)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:75)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1704)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:925)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:133)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:112)
        at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:66)
        at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
        at 
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:87)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at 
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
        at 
net.butfly.albatis.iceberg.IcebergInput.lambda$dequeue$21f50dca$1(IcebergInput.java:69)
        at 
org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.apply(Dataset.scala:2752)
        at 
org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.apply(Dataset.scala:2752)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   14:20:52.543 ERROR - 
org.apache.iceberg.spark.source.BaseDataReader - Error reading file: 
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition/data/timing_day=2012-10-27/00173-2108-b24110b7-b574-4943-84ec-2c549cb5323c-00277.parquet
   java.util.NoSuchElementException
        at java.util.LinkedList.removeLast(LinkedList.java:283)
        at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.get(SocketIOWithTimeout.java:414)
        at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:325)
        at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:537)
        at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.newBlockReader(BlockReaderRemote.java:407)
        at 
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:848)
        at 
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:744)
        at 
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:379)
        at 
org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:644)
        at 
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:575)
        at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:757)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:836)
        at 
org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:147)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:81)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:90)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:75)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1704)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:925)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:133)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:112)
        at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:66)
        at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
        at 
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:87)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at 
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
        at 
net.butfly.albatis.iceberg.IcebergInput.lambda$dequeue$21f50dca$1(IcebergInput.java:69)
        at 
org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.apply(Dataset.scala:2752)
        at 
org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.apply(Dataset.scala:2752)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   This error always occurs when i tranversing this table and i don't know why. 
Some kind help will be appreciate!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to