[
https://issues.apache.org/jira/browse/FLINK-13655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16903660#comment-16903660
]
LiJun commented on FLINK-13655:
-------------------------------
I verified this issue in Flink 1.8.1 version. The problem can not be
reproduced. so I think it is fixed in latest version.
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception
> --------------------------------------------------------------------------------------------------
>
> Key: FLINK-13655
> URL: https://issues.apache.org/jira/browse/FLINK-13655
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Affects Versions: 1.6.3
> Environment: {color:#e8bf6a}<properties>
> {color}{color:#e8bf6a}
> <project.build.sourceEncoding>{color}UTF-8{color:#e8bf6a}</project.build.sourceEncoding>
> {color}{color:#e8bf6a}
> <flink.version>{color}1.5.6{color:#e8bf6a}</flink.version>
> {color}{color:#e8bf6a}
> <slf4j.version>{color}1.7.7{color:#e8bf6a}</slf4j.version>
> {color}{color:#e8bf6a}
> <log4j.version>{color}1.2.17{color:#e8bf6a}</log4j.version>
> {color}{color:#e8bf6a}
> <scala.binary.version>{color}2.11{color:#e8bf6a}</scala.binary.version>
> {color}{color:#e8bf6a}
> <scala.version>{color}2.11.12{color:#e8bf6a}</scala.version>
> {color}{color:#e8bf6a}</properties>{color}
> {color:#e8bf6a}_parameters.setBoolean("recursive.file_{color:#333333}.enumeration",true){color}{color}
> Reporter: LiJun
> Priority: Minor
> Labels: KryoSerializer
>
> Symptom:
> flink program can sucessfully read and process single ORC file from
> HDFS,whatever given reading path is file's parent folder or specific file
> path. However,I put them together in the same folder and program reads that
> folder, the following error always occurs.
> {color:#cc7832}val {color}configHadoop = {color:#cc7832}new
> {color}org.apache.hadoop.conf.Configuration()
> configHadoop.set({color:#6a8759}"HADOOP_USER_NAME"{color}{color:#cc7832},
> {color}{color:#6a8759}"user"{color})
> configHadoop.set({color:#6a8759}"fs.defaultFS"{color}{color:#cc7832},
> {color}{color:#6a8759}"xx.xxx.xx.xx"{color})
> {color:#cc7832}val {color}env = ExecutionEnvironment.getExecutionEnvironment
> {color:#cc7832}val {color}bTableEnv =
> TableEnvironment.getTableEnvironment(env)
> {color:#cc7832}val {color}orcTableSource = OrcTableSource.builder()
> {color:#808080}// path to ORC file(s). NOTE: By default, directories are
> recursively scanned.{color} .path(inPath)
> {color:#808080}// schema of ORC files{color}
> .forOrcSchema({color:#6a8759}"struct<storage_time:String,storage_source_msg:String,rev_name:String,where2:String,rev_phone:String,bind_email:String,ord_pin:String,ord_tm:String>"{color})
> {color:#808080}// Hadoop configuration{color}
> .withConfiguration(configHadoop)
> {color:#808080}// build OrcTableSource{color} .build()
>
> ------------The following is stack info --------------------
> Root exception
> Timestamp: 2019-08-08, 20:15:05
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at
> com.jd.risk.flink.analysis.framework.core.EventOfflineServiceFrameWork.startService(EventOfflineServiceFrameWork.scala:41))
> -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: Index:
> 97, Size: 17
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: Index:
> 97, Size: 17
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: Index: 97, Size: 17
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 17
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:335)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:350)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:84)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
> at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1375)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
> CHAIN GroupReduce (GroupReduce at
> com.jd.risk.flink.analysis.framework.core.EventOfflineServiceFrameWork.startService(EventOfflineServiceFrameWork.scala:41))
> -> Map (Key Extractor) (305/480)
> Timestamp: 2019-08-08, 20:15:05 Location:
> LF-BCC-POD0-172-21-60-234.hadoop.jd.local:15837
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at
> com.jd.risk.flink.analysis.framework.core.EventOfflineServiceFrameWork.startService(EventOfflineServiceFrameWork.scala:41))
> -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: Index:
> 97, Size: 17
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: Index:
> 97, Size: 17
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: Index: 97, Size: 17
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 17
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:335)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:350)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:84)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
> at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1375)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)