Hi JB,
We got past the issue finally! The issue was due to InputSplit
Serialization/Deserialization. During Deserialization, the
ESInputSplit/CqlInputSplit object could not be constructed back, as the classes
ESInputSplit and CqlInputSPlit are protected classes, and hence they are not
accessible during run time while constructing the object.
Changed the implementation of SerializableSplit serialization and
deserialization by using ObjectWritable methods for writing and reading
objects. In this implementation, now there is no need to call
Class.forName("org.elasticsearch.hadoop.mr.EsInputFormat.EsInputSplit "),
likewise for CqlInputSplit. One thing to note is :Both these inputSplit
implementations(EsInputSplit and CqlInputSplit) extend the abstract
org.apache.hadoop.mapreduce.InputSplit and also implement the
org.apache.hadoop.mapred.InputSplit interface which is deprecated. The
interface InputSplitt extends Writable interface.
We followed an approach that is similar to what is used by Spark HadoopRDD to
serialize splits.
Thanks for your interest.
-Dipti
-----Original Message-----
From: Jean-Baptiste Onofré [mailto:[email protected]]
Sent: Monday, February 20, 2017 1:54 AM
To: [email protected]
Subject: Re: IT issues faced for HadoopInputFormat IO on Spark and Dataflow
runners
OK, I just checked and it's part of the PR.
I'm checking out locally to check. I will let you know.
Regards
JB
On 02/19/2017 08:33 PM, Jean-Baptiste Onofré wrote:
> Hi Dipti,
>
> regarding Elasticsearch, it seems that the classloader doesn't contain
> the EsInputSplit. Using spark-submit, you have to provide the
> corresponding jar as package.
>
> For Cassandra, it seems more something about coder/serialization.
>
> Are the tests part of the PR (in order for me to take a look) ?
>
> Regards
> JB
>
> On 02/19/2017 06:14 PM, Dipti Kulkarni wrote:
>> Hi folks,
>>
>> I am currently working on running HadoopInputFormatIO's CassandraIT
>> and Elasticsearch IT on Spark and Dataflow runners. However I am
>> facing issues in both ITs for specific Classes not being found at run
>> time only when I use the Dataflow runner profile or Spark runner
>> profile. I see issues during deserialization process. Without the
>> profiles, the ITs work fine on the direct runner. Here are the
>> specific details:
>>
>> Cassandra and Elastic IT execution using Spark pipeline options:
>>
>> Cassandra IT execution using Spark runner:
>> Command used:
>> mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format
>> -DskipITs=false -DintegrationTestPipelineOptions='[ "--project=
>> sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ",
>> "--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=
>> 104.154.16.243", "--serverPort=9160","--userName=beamuser"
>> ,"--password=test123"]'
>> -Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOCassandraIT
>>
>> Exception:
>> testHIFReadForCassandraQuery(org.apache.beam.sdk.io.hadoop.inputforma
>> t.integration.tests.HIFIOCassandraIT)
>>
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.IllegalStateException: unread block data
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(S
>> parkPipelineResult.java:73)
>>
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(Spa
>> rkPipelineResult.java:113)
>>
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(Spa
>> rkPipelineResult.java:102)
>>
>> at
>> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.jav
>> a:115)
>>
>> at
>> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.jav
>> a:64)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
>> at
>> org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCass
>> andraIT.testHIFReadForCassandraQuery(HIFIOCassandraIT.java:157)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
>> java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
>> sorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(Framework
>> Method.java:50)
>>
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCal
>> lable.java:12)
>>
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMe
>> thod.java:47)
>>
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMet
>> hod.java:17)
>>
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRun
>> ner.java:78)
>>
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRun
>> ner.java:57)
>>
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> at
>> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> at
>> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> at
>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.
>> java:26)
>>
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at org.junit.runners.Suite.runChild(Suite.java:128)
>> at org.junit.runners.Suite.runChild(Suite.java:27)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> at
>> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> at
>> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> at
>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at
>> org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
>> at
>> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAnd
>> Run(JUnitCoreWrapper.java:137)
>>
>> at
>> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUn
>> itCoreWrapper.java:107)
>>
>> at
>> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCor
>> eWrapper.java:83)
>>
>> at
>> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCor
>> eWrapper.java:75)
>>
>> at
>> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCor
>> eProvider.java:161)
>>
>> at
>> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameCla
>> ssLoader(ForkedBooter.java:290)
>>
>> at
>> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(Fork
>> edBooter.java:242)
>>
>> at
>> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:
>> 121) Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(Objec
>> tInputStream.java:2726)
>>
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(Java
>> Serializer.scala:76)
>>
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSe
>> rializer.scala:115)
>>
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
>> java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
>> .java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Elasticsearch IT execution using Spark runner:
>> Command used:
>> mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format
>> -DskipITs=false -DintegrationTestPipelineOptions='[ "--project=
>> sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ",
>> "--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=
>> 104.198.29.210", "--serverPort=9200","--userName=beamuser"
>> ,"--password=test123"]'
>> -Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOElasticIT
>>
>> Exception:
>> Caused by: java.lang.ClassNotFoundException:
>> org.elasticsearch.hadoop.mr.EsInputFormat.EsInputSplit
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:264)
>> at
>> org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO$HadoopI
>> nputFormatBoundedSource$SerializableSplit.readExternal(HadoopInputFor
>> matIO.java:973)
>>
>> at
>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2062)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2011)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(Java
>> Serializer.scala:76)
>>
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSe
>> rializer.scala:115)
>>
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
>> java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
>> .java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I get same set of exceptions on Dataflow runner too.
>>
>> Here are some things that I have tried to address this issue:
>> 1) I thought that this issue is likely to be due to some dependency
>> conflicts after adding beam-runners-spark and spark-streaming_2.10
>> dependencies in the pom.xml.
>> 2) beam-runners-spark dependency includes dependencies like
>> avro-mapred, hadoop-mapreduce-client-core with hadoop version 2.2.0.
>> Hence, tried excluding these dependencies from beam-runners-spark
>> dependency and then executed the IT. But, that didn't help in resolving.
>> 3) spark-streaming_2.10 dependency includes dependencies like
>> hadoop-client which i tried to exclude from the pom. By this ,
>> ensured that no hadoop 2.2.0 dependency is added in pom.xml. We are
>> using hadoop 2.7.0 version in HadoopInputFormatIO.
>> 4) Compared the dependency trees created before adding Spark runner
>> and after adding Spark runner. The only difference was related to the
>> hadoop client dependencies which i later tried to exclude but that
>> didn't help in resolving the issue.
>> 5) I also tried to change the spark dependency scopes from runtime to
>> provided/compile.
>>
>> Observations:
>>
>> * This issues doesn't seem to be due to some dependency
>> conflict in the pom.xml. Because, if i execute the ITs using
>> DirectRunner with same set of dependencies in the pom.xml,the test
>> proceeds without any exceptions.
>>
>> * The exceptions in execution of both the ITs seem to be due
>> to 'at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)'.
>> In case of Spark runner. This exception comes only when we specify
>> TestSparkRunner as the runner in integrationTestPipelineOptions. And
>> same nature of exception is observed in both Cassandra and
>> elasticsearch ITs.
>>
>> * With DirectRunner it is able to find ESInputSplit class in
>> case of Elasticsearch and the test runs successfully.
>>
>> Below are some the queries for which I am trying to find answers to:
>> 1) For TestSparkRunner, do we need to specify classpath or something
>> similar to let the jars get loaded?
>> 2) Do we need to specify sparkMaster in the pipeline options. We
>> tried using URLs. But it could not parse the URL.
>> 3) Any additional input to be given to the options?
>>
>> Since same issues appear on Dataflow runner, so issue may not be
>> runner specific. Any inputs greatly appreciated.
>>
>> -Dipti
>>
>>
>> DISCLAIMER
>> ==========
>> This e-mail may contain privileged and confidential information which
>> is the property of Persistent Systems Ltd. It is intended only for
>> the use of the individual or entity to which it is addressed. If you
>> are not the intended recipient, you are not authorized to read,
>> retain, copy, print, distribute or use this message. If you have
>> received this communication in error, please notify the sender and
>> delete all copies of this message. Persistent Systems Ltd. does not
>> accept any liability for virus infected mails.
>>
>>
>
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com
DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the
property of Persistent Systems Ltd. It is intended only for the use of the
individual or entity to which it is addressed. If you are not the intended
recipient, you are not authorized to read, retain, copy, print, distribute or
use this message. If you have received this communication in error, please
notify the sender and delete all copies of this message. Persistent Systems
Ltd. does not accept any liability for virus infected mails.