Hi Dipti,
regarding Elasticsearch, it seems that the classloader doesn't contain
the EsInputSplit. Using spark-submit, you have to provide the
corresponding jar as package.
For Cassandra, it seems more something about coder/serialization.
Are the tests part of the PR (in order for me to take a look) ?
Regards
JB
On 02/19/2017 06:14 PM, Dipti Kulkarni wrote:
Hi folks,
I am currently working on running HadoopInputFormatIO's CassandraIT and
Elasticsearch IT on Spark and Dataflow runners. However I am facing issues in
both ITs for specific Classes not being found at run time only when I use the
Dataflow runner profile or Spark runner profile. I see issues during
deserialization process. Without the profiles, the ITs work fine on the direct
runner. Here are the specific details:
Cassandra and Elastic IT execution using Spark pipeline options:
Cassandra IT execution using Spark runner:
Command used:
mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format -DskipITs=false -DintegrationTestPipelineOptions='[ "--project=
sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ",
"--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=104.154.16.243",
"--serverPort=9160","--userName=beamuser" ,"--password=test123"]' -Denforcer.skip=true -Dcheckstyle.skip=true
-Dtest=HIFIOCassandraIT
Exception:
testHIFReadForCassandraQuery(org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCassandraIT)
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalStateException: unread block data
at
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:73)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
at
org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:115)
at
org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:64)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
at
org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCassandraIT.testHIFReadForCassandraQuery(HIFIOCassandraIT.java:157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
at
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
at
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:161)
at
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290)
at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242)
at
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121)
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2726)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Elasticsearch IT execution using Spark runner:
Command used:
mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format -DskipITs=false -DintegrationTestPipelineOptions='[ "--project=
sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ",
"--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=104.198.29.210",
"--serverPort=9200","--userName=beamuser" ,"--password=test123"]' -Denforcer.skip=true -Dcheckstyle.skip=true
-Dtest=HIFIOElasticIT
Exception:
Caused by: java.lang.ClassNotFoundException:
org.elasticsearch.hadoop.mr.EsInputFormat.EsInputSplit
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO$HadoopInputFormatBoundedSource$SerializableSplit.readExternal(HadoopInputFormatIO.java:973)
at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2062)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2011)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I get same set of exceptions on Dataflow runner too.
Here are some things that I have tried to address this issue:
1) I thought that this issue is likely to be due to some dependency conflicts
after adding beam-runners-spark and spark-streaming_2.10 dependencies in the
pom.xml.
2) beam-runners-spark dependency includes dependencies like avro-mapred,
hadoop-mapreduce-client-core with hadoop version 2.2.0. Hence, tried excluding
these dependencies from beam-runners-spark dependency and then executed the IT.
But, that didn't help in resolving.
3) spark-streaming_2.10 dependency includes dependencies like hadoop-client
which i tried to exclude from the pom. By this , ensured that no hadoop 2.2.0
dependency is added in pom.xml. We are using hadoop 2.7.0 version in
HadoopInputFormatIO.
4) Compared the dependency trees created before adding Spark runner and after
adding Spark runner. The only difference was related to the hadoop client
dependencies which i later tried to exclude but that didn't help in resolving
the issue.
5) I also tried to change the spark dependency scopes from runtime to
provided/compile.
Observations:
* This issues doesn't seem to be due to some dependency conflict in the
pom.xml. Because, if i execute the ITs using DirectRunner with same set of
dependencies in the pom.xml,the test proceeds without any exceptions.
* The exceptions in execution of both the ITs seem to be due to 'at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)'.
In case of Spark runner. This exception comes only when we specify
TestSparkRunner as the runner in integrationTestPipelineOptions. And same
nature of exception is observed in both Cassandra and elasticsearch ITs.
* With DirectRunner it is able to find ESInputSplit class in case of
Elasticsearch and the test runs successfully.
Below are some the queries for which I am trying to find answers to:
1) For TestSparkRunner, do we need to specify classpath or something similar to
let the jars get loaded?
2) Do we need to specify sparkMaster in the pipeline options. We tried using
URLs. But it could not parse the URL.
3) Any additional input to be given to the options?
Since same issues appear on Dataflow runner, so issue may not be runner
specific. Any inputs greatly appreciated.
-Dipti
DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the
property of Persistent Systems Ltd. It is intended only for the use of the
individual or entity to which it is addressed. If you are not the intended
recipient, you are not authorized to read, retain, copy, print, distribute or
use this message. If you have received this communication in error, please
notify the sender and delete all copies of this message. Persistent Systems
Ltd. does not accept any liability for virus infected mails.
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com