Hi JB,

We got past the issue finally! The issue was due to InputSplit 
Serialization/Deserialization. During Deserialization, the 
ESInputSplit/CqlInputSplit object could not be constructed back, as the classes 
ESInputSplit and CqlInputSPlit are protected classes, and hence they are not 
accessible during run time while constructing the object.
Changed the implementation of SerializableSplit serialization and 
deserialization by using ObjectWritable methods for writing and reading 
objects. In  this implementation, now there is no need to call 
Class.forName("org.elasticsearch.hadoop.mr.EsInputFormat.EsInputSplit "), 
likewise for CqlInputSplit. One thing to note is :Both these inputSplit 
implementations(EsInputSplit and CqlInputSplit)  extend the abstract 
org.apache.hadoop.mapreduce.InputSplit and also implement the 
org.apache.hadoop.mapred.InputSplit interface which is deprecated. The 
interface InputSplitt extends Writable interface.

We followed an  approach that is similar to what is  used by Spark HadoopRDD to 
serialize splits.
Thanks for your interest.

-Dipti



-----Original Message-----
From: Jean-Baptiste Onofré [mailto:[email protected]]
Sent: Monday, February 20, 2017 1:54 AM
To: [email protected]
Subject: Re: IT issues faced for HadoopInputFormat IO on Spark and Dataflow 
runners

OK, I just checked and it's part of the PR.

I'm checking out locally to check. I will let you know.

Regards
JB

On 02/19/2017 08:33 PM, Jean-Baptiste Onofré wrote:
> Hi Dipti,
>
> regarding Elasticsearch, it seems that the classloader doesn't contain
> the EsInputSplit. Using spark-submit, you have to provide the
> corresponding jar as package.
>
> For Cassandra, it seems more something about coder/serialization.
>
> Are the tests part of the PR (in order for me to take a look) ?
>
> Regards
> JB
>
> On 02/19/2017 06:14 PM, Dipti Kulkarni wrote:
>> Hi folks,
>>
>> I am currently working on running HadoopInputFormatIO's CassandraIT
>> and Elasticsearch IT on Spark and Dataflow runners. However I  am
>> facing issues in both ITs for specific Classes not being found at run
>> time only when I use the Dataflow runner profile or Spark runner
>> profile. I see issues during deserialization process.  Without the
>> profiles, the ITs work fine on the direct runner. Here are the
>> specific details:
>>
>> Cassandra and Elastic IT execution using Spark pipeline options:
>>
>> Cassandra IT execution using Spark runner:
>> Command used:
>> mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format
>> -DskipITs=false -DintegrationTestPipelineOptions='[ "--project=
>> sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ",
>> "--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=
>> 104.154.16.243", "--serverPort=9160","--userName=beamuser"
>> ,"--password=test123"]'
>> -Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOCassandraIT
>>
>> Exception:
>> testHIFReadForCassandraQuery(org.apache.beam.sdk.io.hadoop.inputforma
>> t.integration.tests.HIFIOCassandraIT)
>>
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.IllegalStateException: unread block data
>>         at
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(S
>> parkPipelineResult.java:73)
>>
>>         at
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(Spa
>> rkPipelineResult.java:113)
>>
>>         at
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(Spa
>> rkPipelineResult.java:102)
>>
>>         at
>> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.jav
>> a:115)
>>
>>         at
>> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.jav
>> a:64)
>>
>>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
>>         at
>> org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCass
>> andraIT.testHIFReadForCassandraQuery(HIFIOCassandraIT.java:157)
>>
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
>> java:62)
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
>> sorImpl.java:43)
>>
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(Framework
>> Method.java:50)
>>
>>         at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCal
>> lable.java:12)
>>
>>         at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMe
>> thod.java:47)
>>
>>         at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMet
>> hod.java:17)
>>
>>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>         at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRun
>> ner.java:78)
>>
>>         at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRun
>> ner.java:57)
>>
>>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>         at
>> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>         at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>         at
>> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>         at
>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>         at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.
>> java:26)
>>
>>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>         at org.junit.runners.Suite.runChild(Suite.java:128)
>>         at org.junit.runners.Suite.runChild(Suite.java:27)
>>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>         at
>> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>         at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>         at
>> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>         at
>> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>         at
>> org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
>>         at
>> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAnd
>> Run(JUnitCoreWrapper.java:137)
>>
>>         at
>> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUn
>> itCoreWrapper.java:107)
>>
>>         at
>> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCor
>> eWrapper.java:83)
>>
>>         at
>> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCor
>> eWrapper.java:75)
>>
>>         at
>> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCor
>> eProvider.java:161)
>>
>>         at
>> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameCla
>> ssLoader(ForkedBooter.java:290)
>>
>>         at
>> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(Fork
>> edBooter.java:242)
>>
>>         at
>> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:
>> 121) Caused by: java.lang.IllegalStateException: unread block data
>>         at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(Objec
>> tInputStream.java:2726)
>>
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>>         at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>>         at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>>         at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(Java
>> Serializer.scala:76)
>>
>>         at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSe
>> rializer.scala:115)
>>
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
>> java:1142)
>>
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
>> .java:617)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> Elasticsearch IT execution using Spark runner:
>> Command used:
>> mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format
>> -DskipITs=false -DintegrationTestPipelineOptions='[ "--project=
>> sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ",
>> "--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=
>> 104.198.29.210", "--serverPort=9200","--userName=beamuser"
>> ,"--password=test123"]'
>> -Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOElasticIT
>>
>> Exception:
>> Caused by: java.lang.ClassNotFoundException:
>> org.elasticsearch.hadoop.mr.EsInputFormat.EsInputSplit
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>         at java.lang.Class.forName0(Native Method)
>>         at java.lang.Class.forName(Class.java:264)
>>         at
>> org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO$HadoopI
>> nputFormatBoundedSource$SerializableSplit.readExternal(HadoopInputFor
>> matIO.java:973)
>>
>>         at
>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2062)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2011)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>>         at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>>         at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
>>         at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>         at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>>         at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(Java
>> Serializer.scala:76)
>>
>>         at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSe
>> rializer.scala:115)
>>
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
>> java:1142)
>>
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
>> .java:617)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> I get same set of exceptions on Dataflow runner too.
>>
>> Here are some things that I have tried to address this issue:
>> 1) I  thought that this issue is likely to be due to some dependency
>> conflicts after adding beam-runners-spark and spark-streaming_2.10
>> dependencies in the pom.xml.
>> 2) beam-runners-spark dependency includes dependencies like
>> avro-mapred, hadoop-mapreduce-client-core with hadoop version 2.2.0.
>> Hence, tried excluding these dependencies from beam-runners-spark
>> dependency and then executed the IT. But, that didn't help in resolving.
>> 3) spark-streaming_2.10 dependency includes dependencies like
>> hadoop-client which i tried to exclude from the pom. By this ,
>> ensured that no hadoop 2.2.0 dependency is added in pom.xml. We are
>> using hadoop 2.7.0 version in HadoopInputFormatIO.
>> 4) Compared the dependency trees created before adding Spark runner
>> and after adding Spark runner. The only difference was related to the
>> hadoop client dependencies which i later tried to exclude but that
>> didn't help in resolving the issue.
>> 5) I also tried to change the spark dependency scopes from runtime to
>> provided/compile.
>>
>> Observations:
>>
>> *         This issues doesn't seem to be due to some dependency
>> conflict in the pom.xml. Because, if i execute the ITs using
>> DirectRunner with same set of dependencies in the pom.xml,the test
>> proceeds without any exceptions.
>>
>> *         The exceptions in execution of both the ITs seem to be due
>> to 'at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)'.
>> In case of Spark runner. This exception comes only when we specify
>> TestSparkRunner as the runner in integrationTestPipelineOptions. And
>> same nature of exception is observed in both Cassandra and
>> elasticsearch ITs.
>>
>> *         With DirectRunner it is able to find ESInputSplit class in
>> case of Elasticsearch and the test runs successfully.
>>
>> Below are some the queries for which I am trying to find  answers to:
>> 1) For TestSparkRunner, do we need to specify classpath or something
>> similar to let the jars get loaded?
>> 2) Do we need to specify sparkMaster in the pipeline options. We
>> tried using URLs. But it could not parse the URL.
>> 3) Any additional input to be given to the options?
>>
>> Since same issues appear on Dataflow runner, so issue may not be
>> runner specific. Any inputs greatly appreciated.
>>
>> -Dipti
>>
>>
>> DISCLAIMER
>> ==========
>> This e-mail may contain privileged and confidential information which
>> is the property of Persistent Systems Ltd. It is intended only for
>> the use of the individual or entity to which it is addressed. If you
>> are not the intended recipient, you are not authorized to read,
>> retain, copy, print, distribute or use this message. If you have
>> received this communication in error, please notify the sender and
>> delete all copies of this message. Persistent Systems Ltd. does not
>> accept any liability for virus infected mails.
>>
>>
>

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.

Reply via email to