[ 
https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831458#comment-15831458
 ] 

Marc Hübner edited comment on FLINK-4719 at 1/20/17 10:40 AM:
--------------------------------------------------------------

I get a similar exception, when spilling to disk to process big amounts of avro 
files. It might not be related, because at first, flink failed to use the avro 
serializer. But after enforcing the avro serializer it still failed to 
deserialize after spilling to disk. So I am assuming that the problem is 
actually located in "SpillingAdaptiveSpanningRecordDeserializer". I am using 
scala and flink 1.1.4.

My Code (removed the import for the Document avro class, the io utils are 
reading from hdfs):
{code:java}
import java.io.IOException

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.solr.common.params.MapSolrParams
import org.apache.solr.update.processor.TextProfileSignature
import org.slf4j.LoggerFactory

object DeduplicationFilterJob {
  private val LOG = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]) {
    if (args.length != 1) {
      printUsage()
      System.exit(-1)
    }
    try {
      LOG.info("Loading configuration from file " + args(0))
      val conf = ParameterTool.fromPropertiesFile(args(0)).getConfiguration
      val exitVal: Int = run(conf)
      System.exit(exitVal)
    } catch {
      case e: IOException =>
        LOG.error("Could not load job configuration", e)
        printUsage()
    }
  }

  private def printUsage() = {
    println(s"Usage: java -cp [classpath] ${getClass.getName} config")
    println("config - a valid FS path to a config/properties file")
  }

  private def run(conf: Configuration): Int = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setGlobalJobParameters(conf)
    env.getConfig.enableForceAvro()

    if (conf.getInteger("env.parallelism", -1) > 0)
      env.setParallelism(conf.getInteger("env.parallelism", -1))

    val documents: DataSet[Document] =
      DatasetIOUtils.readDocuments(env, conf.getString("input.path", null), 
conf.getString("input.format", "avro"))
      .name("Read documents")

    val signatureDocumentPairs: DataSet[(String, Document)] = 
documents.map((document: Document) => {
      val contentSignature = new TextProfileSignature
      contentSignature.init(new MapSolrParams(new java.util.HashMap[String, 
String]()))
      contentSignature.add(document.getText)
      val signature = new String(contentSignature.getSignature, "UTF-8")
      (signature, document)
    }).name("Compute document signatures")

    val deduplicatedDocuments = signatureDocumentPairs
      .groupBy(0)
      .reduceGroup(_.next()._2)
      .name("Select first document per signature group")

    DatasetIOUtils.writeDocuments(
      conf.getString("output.path", null),
      conf.getString("output.format", "avro"),
      deduplicatedDocuments)

    val executionResult: JobExecutionResult = env.execute("deduplication 
filter")
    if (executionResult != null) 1 else -1
  }

}
{code}

The stacktrace:
{code}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:805)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'GroupReduce 
(Select first document per signature group)' , caused an error: Error obtaining 
the sorted input: Thread 'SortMerger Reading Thread' terminated due to an 
exception: 795228258
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: 795228258
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: 795228258
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 795228258
        at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:162)
        at 
org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:123)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
        at 
org.apache.flink.api.java.typeutils.runtime.AvroSerializer.deserialize(AvroSerializer.java:123)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:155)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:73)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
{code}

The stack trace of an exception before enforcing the avro serializer:
{code}
java.lang.Exception: The data preparation for task 'GroupReduce (Select first 
document per signature group)' , caused an error: Error obtaining the sorted 
input: Thread 'SortMerger spilling thread' terminated due to an exception: 
Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger spilling thread' terminated due to an exception: Unable to find 
class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated 
due to an exception: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:256)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:89)
        at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ClassNotFoundException: s_business_space
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
        ... 20 more
2017-01-19 11:00:36,423 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Could not 
restart the job spree deduplication filter (b07462ca08254ddcdcad6eef2629c541).
java.lang.Exception: The data preparation for task 'GroupReduce (Select first 
document per signature group)' , caused an error: Error obtaining the sorted 
input: Thread 'SortMerger spilling thread' terminated due to an exception: 
Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger spilling thread' terminated due to an exception: Unable to find 
class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated 
due to an exception: Unable to find class: s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
s_business_space
Serialization trace:
attributes (de.dfki.lt.tap.ConceptMention)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
        at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:256)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:89)
        at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ClassNotFoundException: s_business_space
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
        ... 20 more
2017-01-19 11:00:37,496 INFO  org.apache.flink.yarn.YarnJobManager              
            - Stopping JobManager with final application status SUCCEEDED and 
diagnostics: Flink YARN Client requested shutdown
2017-01-19 11:00:37,497 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
            - Shutting down cluster with status SUCCEEDED : Flink YARN Client 
requested shutdown
2017-01-19 11:00:37,499 INFO  org.apache.flink.yarn.YarnFlinkResourceManager    
            - Unregistering application from the YARN Resource Manager
2017-01-19 11:00:37,509 INFO  org.apache.flink.yarn.YarnJobManager              
            - Stopping JobManager 
akka.tcp://flink@10.13.2.6:40283/user/jobmanager.
2017-01-19 11:00:37,509 INFO  
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Waiting for 
application to be successfully unregistered.
2017-01-19 11:00:37,612 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
while waiting for queue
java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)
2017-01-19 11:00:37,613 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : abemasanao.unbelievable-machine.net:8041
2017-01-19 11:00:37,637 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : acamas.unbelievable-machine.net:8041
2017-01-19 11:00:37,648 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : baggaley.unbelievable-machine.net:8041
2017-01-19 11:00:37,661 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : dido.unbelievable-machine.net:8041
2017-01-19 11:00:37,721 INFO  org.apache.flink.runtime.blob.BlobServer          
            - Stopped BLOB server at 0.0.0.0:40514
2017-01-19 11:00:37,728 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down 
remote daemon.
2017-01-19 11:00:37,729 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon 
shut down; proceeding with flushing remote transports.
2017-01-19 11:00:37,758 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut 
down.
2017-01-19 11:00:37,775 INFO  org.apache.flink.yarn.YarnJobManager              
            - Shutdown completed. Stopping JVM.
2017-01-19 11:00:37,777 INFO  
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor         - Removing web 
dashboard root cache directory 
/tmp/flink-web-4e068ddd-fe08-4115-8e98-794e7e5b83c6
2017-01-19 11:00:37,777 INFO  
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor         - Removing web 
dashboard jar upload directory 
/tmp/flink-web-upload-344a9e8c-a495-43cb-a8bb-e28388cb70f2
2017-01-19 11:00:37,778 INFO  
org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinator  - Shutting 
down stack trace sample coordinator.
{code}


was (Author: mhbnr):
I get a similar exception, when spilling to disk to process big amounts of avro 
files. It might not be related, because at first, flink failed to use the avro 
serializer. But after enforcing the avro serializer it still failed to 
deserialize after spilling to disk. So I am assuming that the problem is 
actually located in "SpillingAdaptiveSpanningRecordDeserializer". I am using 
scala and flink 1.1.4.

My Code (removed the import for the Document avro class, the io utils are 
reading from hdfs):
{code:java}
import java.io.IOException

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.solr.common.params.MapSolrParams
import org.apache.solr.update.processor.TextProfileSignature
import org.slf4j.LoggerFactory

object DeduplicationFilterJob {
  private val LOG = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]) {
    if (args.length != 1) {
      printUsage()
      System.exit(-1)
    }
    try {
      LOG.info("Loading configuration from file " + args(0))
      val conf = ParameterTool.fromPropertiesFile(args(0)).getConfiguration
      val exitVal: Int = run(conf)
      System.exit(exitVal)
    } catch {
      case e: IOException =>
        LOG.error("Could not load job configuration", e)
        printUsage()
    }
  }

  private def printUsage() = {
    println(s"Usage: java -cp [classpath] ${getClass.getName} config")
    println("config - a valid FS path to a config/properties file")
  }

  private def run(conf: Configuration): Int = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setGlobalJobParameters(conf)
    env.getConfig.enableForceAvro()

    if (conf.getInteger("env.parallelism", -1) > 0)
      env.setParallelism(conf.getInteger("env.parallelism", -1))

    val documents: DataSet[Document] =
      DatasetIOUtils.readDocuments(env, conf.getString("input.path", null), 
conf.getString("input.format", "avro"))
      .name("Read documents")

    val signatureDocumentPairs: DataSet[(String, Document)] = 
documents.map((document: Document) => {
      val contentSignature = new TextProfileSignature
      contentSignature.init(new MapSolrParams(new java.util.HashMap[String, 
String]()))
      contentSignature.add(document.getText)
      val signature = new String(contentSignature.getSignature, "UTF-8")
      (signature, document)
    }).name("Compute document signatures")

    val deduplicatedDocuments = signatureDocumentPairs
      .groupBy(0)
      .reduceGroup(_.next()._2)
      .name("Select first document per signature group")

    DatasetIOUtils.writeDocuments(
      conf.getString("output.path", null),
      conf.getString("output.format", "avro"),
      deduplicatedDocuments)

    val executionResult: JobExecutionResult = env.execute("deduplication 
filter")
    if (executionResult != null) 1 else -1
  }

}
{code}

The stacktrace:
{code}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:805)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:751)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'GroupReduce 
(Select first document per signature group)' , caused an error: Error obtaining 
the sorted input: Thread 'SortMerger Reading Thread' terminated due to an 
exception: 795228258
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:462)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: 795228258
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: 795228258
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 795228258
        at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:162)
        at 
org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:123)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
        at 
org.apache.flink.api.java.typeutils.runtime.AvroSerializer.deserialize(AvroSerializer.java:123)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:155)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:73)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
{code}

> KryoSerializer random exception
> -------------------------------
>
>                 Key: FLINK-4719
>                 URL: https://issues.apache.org/jira/browse/FLINK-4719
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.1
>            Reporter: Flavio Pompermaier
>            Assignee: Nico Kruber
>              Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when 
> using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here 
> can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at 
> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted 
> input: Thread 'SortMerger spilling thread' terminated due to an exception: 
> Unable to find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: Unable to 
> find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>         at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>         ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
> java.ttil.HashSet
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>         at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>         at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>         at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>         at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:348)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>     at java.util.ArrayList.elementData(ArrayList.java:418)
>     at java.util.ArrayList.get(ArrayList.java:431)
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>     at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>     at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> {code}
> {code}
> java.lang.RuntimeException: Cannot instantiate class.
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>       at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> it.okkam.flink.test.model.pojo.VdhicleEvent
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>       ... 10 more
> {code}
> {code}
> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>         at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>         at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>         at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>       at java.util.ArrayList.elementData(ArrayList.java:418)
>       at java.util.ArrayList.get(ArrayList.java:431)
>       at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>       at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>       at 
> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>       at 
> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>       at 
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>       at 
> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
>       at 
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to