OK, I just checked and it's part of the PR. I'm checking out locally to check. I will let you know.
Regards JB On 02/19/2017 08:33 PM, Jean-Baptiste Onofré wrote:
Hi Dipti, regarding Elasticsearch, it seems that the classloader doesn't contain the EsInputSplit. Using spark-submit, you have to provide the corresponding jar as package. For Cassandra, it seems more something about coder/serialization. Are the tests part of the PR (in order for me to take a look) ? Regards JB On 02/19/2017 06:14 PM, Dipti Kulkarni wrote:Hi folks, I am currently working on running HadoopInputFormatIO's CassandraIT and Elasticsearch IT on Spark and Dataflow runners. However I am facing issues in both ITs for specific Classes not being found at run time only when I use the Dataflow runner profile or Spark runner profile. I see issues during deserialization process. Without the profiles, the ITs work fine on the direct runner. Here are the specific details: Cassandra and Elastic IT execution using Spark pipeline options: Cassandra IT execution using Spark runner: Command used: mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format -DskipITs=false -DintegrationTestPipelineOptions='[ "--project= sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ", "--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=104.154.16.243", "--serverPort=9160","--userName=beamuser" ,"--password=test123"]' -Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOCassandraIT Exception: testHIFReadForCassandraQuery(org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCassandraIT) org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: unread block data at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:73) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102) at org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:115) at org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:64) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210) at org.apache.beam.sdk.io.hadoop.inputformat.integration.tests.HIFIOCassandraIT.testHIFReadForCassandraQuery(HIFIOCassandraIT.java:157) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137) at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107) at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83) at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75) at org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:161) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121) Caused by: java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2726) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Elasticsearch IT execution using Spark runner: Command used: mvn -e -Pspark-runner install -pl sdks/java/io/hadoop-input-format -DskipITs=false -DintegrationTestPipelineOptions='[ "--project= sincere-nirvana-150207", "--tempRoot= gs://sisk-test/staging ", "--runner=org.apache.beam.runners.spark.TestSparkRunner","--serverIp=104.198.29.210", "--serverPort=9200","--userName=beamuser" ,"--password=test123"]' -Denforcer.skip=true -Dcheckstyle.skip=true -Dtest=HIFIOElasticIT Exception: Caused by: java.lang.ClassNotFoundException: org.elasticsearch.hadoop.mr.EsInputFormat.EsInputSplit at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO$HadoopInputFormatBoundedSource$SerializableSplit.readExternal(HadoopInputFormatIO.java:973) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2062) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2011) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:207) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I get same set of exceptions on Dataflow runner too. Here are some things that I have tried to address this issue: 1) I thought that this issue is likely to be due to some dependency conflicts after adding beam-runners-spark and spark-streaming_2.10 dependencies in the pom.xml. 2) beam-runners-spark dependency includes dependencies like avro-mapred, hadoop-mapreduce-client-core with hadoop version 2.2.0. Hence, tried excluding these dependencies from beam-runners-spark dependency and then executed the IT. But, that didn't help in resolving. 3) spark-streaming_2.10 dependency includes dependencies like hadoop-client which i tried to exclude from the pom. By this , ensured that no hadoop 2.2.0 dependency is added in pom.xml. We are using hadoop 2.7.0 version in HadoopInputFormatIO. 4) Compared the dependency trees created before adding Spark runner and after adding Spark runner. The only difference was related to the hadoop client dependencies which i later tried to exclude but that didn't help in resolving the issue. 5) I also tried to change the spark dependency scopes from runtime to provided/compile. Observations: * This issues doesn't seem to be due to some dependency conflict in the pom.xml. Because, if i execute the ITs using DirectRunner with same set of dependencies in the pom.xml,the test proceeds without any exceptions. * The exceptions in execution of both the ITs seem to be due to 'at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)'. In case of Spark runner. This exception comes only when we specify TestSparkRunner as the runner in integrationTestPipelineOptions. And same nature of exception is observed in both Cassandra and elasticsearch ITs. * With DirectRunner it is able to find ESInputSplit class in case of Elasticsearch and the test runs successfully. Below are some the queries for which I am trying to find answers to: 1) For TestSparkRunner, do we need to specify classpath or something similar to let the jars get loaded? 2) Do we need to specify sparkMaster in the pipeline options. We tried using URLs. But it could not parse the URL. 3) Any additional input to be given to the options? Since same issues appear on Dataflow runner, so issue may not be runner specific. Any inputs greatly appreciated. -Dipti DISCLAIMER ========== This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.
-- Jean-Baptiste Onofré [email protected] http://blog.nanthrax.net Talend - http://www.talend.com
