Hi folks,

I am currently working on running HadoopInputFormatIO's CassandraIT and 
Elasticsearch IT on Spark and Dataflow runners. However I  am facing issues in 
both ITs for specific Classes not being found at run time only when I use the 
Dataflow runner profile or Spark runner profile. I see issues during 
deserialization process.  Without the profiles, the ITs work fine on the direct 
runner. Here are the specific details:

Cassandra and Elastic IT execution using Spark pipeline options:

Cassandra IT execution using Spark runner:
Command used:
mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format 
-DskipITs=false -DintegrationTestPipelineOptions='[ "--project= 
sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ", 
"--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=104.154.16.243",
 "--serverPort=9160","--userName=beamuser" ,"--password=test123"]' 
-Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOCassandraIT

Exception:
testHIFReadForCassandraQuery(org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCassandraIT)
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalStateException: unread block data
        at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:73)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
        at 
org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:115)
        at 
org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:64)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
        at 
org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCassandraIT.testHIFReadForCassandraQuery(HIFIOCassandraIT.java:157)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
        at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
        at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
        at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
        at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
        at 
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:161)
        at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290)
        at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242)
        at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121)
Caused by: java.lang.IllegalStateException: unread block data
        at 
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2726)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Elasticsearch IT execution using Spark runner:
Command used:
mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format 
-DskipITs=false -DintegrationTestPipelineOptions='[ "--project= 
sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ", 
"--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=104.198.29.210",
 "--serverPort=9200","--userName=beamuser" ,"--password=test123"]' 
-Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOElasticIT

Exception:
Caused by: java.lang.ClassNotFoundException: 
org.elasticsearch.hadoop.mr.EsInputFormat.EsInputSplit
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at 
org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO$HadoopInputFormatBoundedSource$SerializableSplit.readExternal(HadoopInputFormatIO.java:973)
        at 
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2062)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2011)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I get same set of exceptions on Dataflow runner too.

Here are some things that I have tried to address this issue:
1) I  thought that this issue is likely to be due to some dependency conflicts 
after adding beam-runners-spark and spark-streaming_2.10 dependencies in the 
pom.xml.
2) beam-runners-spark dependency includes dependencies like avro-mapred, 
hadoop-mapreduce-client-core with hadoop version 2.2.0. Hence, tried excluding 
these dependencies from beam-runners-spark dependency and then executed the IT. 
But, that didn't help in resolving.
3) spark-streaming_2.10 dependency includes dependencies like hadoop-client 
which i tried to exclude from the pom. By this , ensured that no hadoop 2.2.0 
dependency is added in pom.xml. We are using hadoop 2.7.0 version in 
HadoopInputFormatIO.
4) Compared the dependency trees created before adding Spark runner and after 
adding Spark runner. The only difference was related to the hadoop client 
dependencies which i later tried to exclude but that didn't help in resolving 
the issue.
5) I also tried to change the spark dependency scopes from runtime to 
provided/compile.

Observations:

*         This issues doesn't seem to be due to some dependency conflict in the 
pom.xml. Because, if i execute the ITs using DirectRunner with same set of 
dependencies in the pom.xml,the test proceeds without any exceptions.

*         The exceptions in execution of both the ITs seem to be due to 'at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)'.
 In case of Spark runner. This exception comes only when we specify 
TestSparkRunner as the runner in integrationTestPipelineOptions. And same 
nature of exception is observed in both Cassandra and elasticsearch ITs.

*         With DirectRunner it is able to find ESInputSplit class in case of 
Elasticsearch and the test runs successfully.

Below are some the queries for which I am trying to find  answers to:
1) For TestSparkRunner, do we need to specify classpath or something similar to 
let the jars get loaded?
2) Do we need to specify sparkMaster in the pipeline options. We tried using 
URLs. But it could not parse the URL.
3) Any additional input to be given to the options?

Since same issues appear on Dataflow runner, so issue may not be runner 
specific. Any inputs greatly appreciated.

-Dipti


DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.

Reply via email to