wangxujin1221 opened a new issue #3921:
URL: https://github.com/apache/iceberg/issues/3921
We have an iceberg table partioned by time field and has 200 k records.
the table infomation:
```
# Detailed Table Information
Database: default
OwnerType: USER
Owner: wangxujin
CreateTime: Wed Jan 19 13:31:20 CST 2022
LastAccessTime: Fri Jan 23 05:47:18 CST 1970
Retention: 2147483647
Location:
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition
Table Type: EXTERNAL_TABLE
Table Parameters:
EXTERNAL TRUE
engine.hive.enabled true
metadata_location
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition/metadata/00010-5cd4b7b9-350d-4341-b129-3921cd17d204.metadata.json
numFiles 73520
numRows 200000
previous_metadata_location
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition/metadata/00009-bed22c7b-1fd3-44f7-a66f-5bc7542bcd3d.metadata.json
storage_handler
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize 2856930656
transient_lastDdlTime 1642570280
# Storage Information
SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe
InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Num Buckets: 0
Bucket Columns: []
Sort Columns: []
```
This table was write by below code:
```
// this udf is used to convert a timestamp to month
sparkSession.udf().register(PARTITION_EXPR_FUNC_NAME,new
IcebergPartitionFunc("month"),DataTypes.IntegerType);
dataset = df.repartition(200,expr).sortWithinPartitions(partitionField);
dataset.write()
.options(options)
.format(WRITE_FORMAT)
.mode(SaveMode.Append)
.save(null != namespace ? namespace + "." +
tableName : tableName);
//udf
static class IcebergPartitionFunc implements UDF1<Date,Integer>{
private final String partitionTimeUnit;
public IcebergPartitionFunc(String partitionTimeUnit){
this.partitionTimeUnit = partitionTimeUnit;
}
@Override
public Integer call(Date time){
long l = TimeUnit.MILLISECONDS.toMicros(time.getTime());
switch(partitionTimeUnit.toLowerCase()){
case "year":
return Timestamps.YEAR.apply(l);
case "month":
return Timestamps.MONTH.apply(l);
case "day":
return Timestamps.DAY.apply(l);
case "hour":
return Timestamps.HOUR.apply(l);
default:
throw new IllegalArgumentException("Not support transform
"+partitionTimeUnit.toLowerCase()+" to integer now.");
}
}
}
```
When i tranverse the table like below, it always failed.
```
Dataset<Row> dataset =
connection.client.read().format(icebeg").load("default.test");
dataset.foreachPartition(par ->{
while(par.hasNext()){ // this line always failed with an error
try{
OUTPUT_LIST.put(par.next());
}catch(InterruptedException e){
logger.error("iceberg records caching failed in
LinkedBlockingQueue.",e);
}
}
});
```
The error log :
```
ERROR - org.apache.iceberg.spark.source.BaseDataReader - Error reading file:
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition/data/timing_day=2012-07-10/00173-1316-8f19d27e-56a2-48c1-87a5-a9c0e234dbea-00177.parquet
java.lang.NullPointerException
at java.util.LinkedList$ListItr.next(LinkedList.java:893)
at
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.trimIdleSelectors(SocketIOWithTimeout.java:447)
at
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.get(SocketIOWithTimeout.java:417)
at
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:325)
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:256)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:207)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
at
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:165)
at
org.apache.hadoop.hdfs.ByteBufferStrategy.readFromBlock(ReaderStrategy.java:180)
at
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:705)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:766)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:836)
at
org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:147)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:81)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:90)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:75)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1704)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:925)
at
org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:133)
at
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:112)
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:66)
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
at
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:87)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at
net.butfly.albatis.iceberg.IcebergInput.lambda$dequeue$21f50dca$1(IcebergInput.java:69)
at
org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.apply(Dataset.scala:2752)
at
org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.apply(Dataset.scala:2752)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[m[0;0;31m14:20:52.543 ERROR -
org.apache.iceberg.spark.source.BaseDataReader - Error reading file:
hdfs://SERVICE-HADOOP-admin/hbp_root/bigdata/hive/warehouse/iceberg0119_day_repartition/data/timing_day=2012-10-27/00173-2108-b24110b7-b574-4943-84ec-2c549cb5323c-00277.parquet
java.util.NoSuchElementException
at java.util.LinkedList.removeLast(LinkedList.java:283)
at
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.get(SocketIOWithTimeout.java:414)
at
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:325)
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at
org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:537)
at
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.newBlockReader(BlockReaderRemote.java:407)
at
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:848)
at
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:744)
at
org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:379)
at
org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:644)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:575)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:757)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:836)
at
org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:147)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:81)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:90)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:75)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1704)
at
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:925)
at
org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:133)
at
org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:112)
at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:66)
at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
at
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:87)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at
net.butfly.albatis.iceberg.IcebergInput.lambda$dequeue$21f50dca$1(IcebergInput.java:69)
at
org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.apply(Dataset.scala:2752)
at
org.apache.spark.sql.Dataset$$anonfun$foreachPartition$2.apply(Dataset.scala:2752)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
This error always occurs when i tranversing this table and i don't know why.
Some kind help will be appreciate!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]