[
https://issues.apache.org/jira/browse/SPARK-18598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hamish Morgan updated SPARK-18598:
----------------------------------
Description:
Most operations of {{org.apache.spark.sql.Dataset}} throw
{{java.lang.AssertionError}} when the {{Dataset}} was created with an Java bean
{{Encoder}}, where the bean has more accessors than properties.
The following until test demonstrates the steps to replicate:
{code}
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.Test;
import org.xml.sax.SAXException;
import java.io.IOException;
import static java.util.Collections.singletonList;
public class SparkBeanEncoderTest {
public static class TestBean2 {
private String name;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public String getName2() {
return name.toLowerCase();
}
}
@Test
public void testCreateDatasetFromBeanFailure() throws IOException,
SAXException {
SparkSession spark = SparkSession
.builder()
.master("local")
.getOrCreate();
TestBean2 bean = new TestBean2();
bean.setName("testing123");
Encoder<TestBean2> encoder = Encoders.bean(TestBean2.class);
Dataset<TestBean2> dataset = spark.createDataset(singletonList(bean),
encoder);
dataset.show();
spark.stop();
}
}
{code}
Running the above produces the following output:
{code}
16/11/27 14:00:04 INFO SparkContext: Running Spark version 2.0.2
16/11/27 14:00:04 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
16/11/27 14:00:04 WARN Utils: Your hostname, XXXX resolves to a loopback
address: 127.0.1.1; using 192.168.1.68 instead (on interface eth0)
16/11/27 14:00:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another
address
16/11/27 14:00:04 INFO SecurityManager: Changing view acls to: XXXX
16/11/27 14:00:04 INFO SecurityManager: Changing modify acls to: XXXX
16/11/27 14:00:04 INFO SecurityManager: Changing view acls groups to:
16/11/27 14:00:04 INFO SecurityManager: Changing modify acls groups to:
16/11/27 14:00:04 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(XXXX); groups
with view permissions: Set(); users with modify permissions: Set(XXXX); groups
with modify permissions: Set()
16/11/27 14:00:05 INFO Utils: Successfully started service 'sparkDriver' on
port 34688.
16/11/27 14:00:05 INFO SparkEnv: Registering MapOutputTracker
16/11/27 14:00:05 INFO SparkEnv: Registering BlockManagerMaster
16/11/27 14:00:05 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-0ae3a00f-eb46-4be2-8ece-1873f3db1a29
16/11/27 14:00:05 INFO MemoryStore: MemoryStore started with capacity 3.0 GB
16/11/27 14:00:05 INFO SparkEnv: Registering OutputCommitCoordinator
16/11/27 14:00:05 INFO Utils: Successfully started service 'SparkUI' on port
4040.
16/11/27 14:00:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://192.168.1.68:4040
16/11/27 14:00:05 INFO Executor: Starting executor ID driver on host localhost
16/11/27 14:00:05 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 42688.
16/11/27 14:00:05 INFO NettyBlockTransferService: Server created on
192.168.1.68:42688
16/11/27 14:00:05 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, 192.168.1.68, 42688)
16/11/27 14:00:05 INFO BlockManagerMasterEndpoint: Registering block manager
192.168.1.68:42688 with 3.0 GB RAM, BlockManagerId(driver, 192.168.1.68, 42688)
16/11/27 14:00:05 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, 192.168.1.68, 42688)
16/11/27 14:00:05 WARN SparkContext: Use an existing SparkContext, some
configuration may not take effect.
16/11/27 14:00:05 INFO SharedState: Warehouse path is
'file:/home/hamish/git/language-identifier/wikidump/spark-warehouse'.
16/11/27 14:00:05 INFO CodeGenerator: Code generated in 166.762154 ms
16/11/27 14:00:06 INFO CodeGenerator: Code generated in 6.144958 ms
java.lang.AssertionError: index (1) should < 1
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:133)
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:352)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38)
at
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.spark.sql.execution.LocalTableScanExec.<init>(LocalTableScanExec.scala:38)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:393)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61)
at
org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47)
at
org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51)
at
org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
at
org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
at
org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
at
SparkBeanEncoderTest.testCreateDatasetFromBeanFailure(SparkBeanEncoderTest.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
16/11/27 14:00:06 INFO SparkContext: Invoking stop() from shutdown hook
16/11/27 14:00:06 INFO SparkUI: Stopped Spark web UI at http://192.168.1.68:4040
16/11/27 14:00:06 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
16/11/27 14:00:06 INFO MemoryStore: MemoryStore cleared
16/11/27 14:00:06 INFO BlockManager: BlockManager stopped
16/11/27 14:00:06 INFO BlockManagerMaster: BlockManagerMaster stopped
16/11/27 14:00:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
16/11/27 14:00:06 INFO SparkContext: Successfully stopped SparkContext
16/11/27 14:00:06 INFO ShutdownHookManager: Shutdown hook called
16/11/27 14:00:06 INFO ShutdownHookManager: Deleting directory
/tmp/spark-bad08a28-51bb-4295-a1e3-691d4679a56c
{code}
The problem seems to be caused by an inconsistency in the way bean properties
are inspected in {{org.apache.spark.sql.catalyst.JavaTypeInference}}; sometimes
filtered by the existence of accessors and mutators, sometimes not. This
inconsistency percolates back to the
{{org.apache.spark.sql.catalyst.encoders.ExpressionEncoder}}, where the
serializer has a different field count from the schema.
Desired behaviour here is debatable, but I'm pretty sure AssertionErrors are
always a bug. One simple fix would be to introduce a check so it fails faster,
and with a more helpful message. Personally, I'd quite like it just work, even
when there are too many accessors. To that end I've written a fix,
which I shall PR shortly.
was:
Most operations of {{org.apache.spark.sql.Dataset}} throw
{{java.lang.AssertionError}} when the {{Dataset}} was created with an Java bean
{{Encoder}}, where the bean has more accessors that properties.
The following until test demonstrates the steps to replicate:
{code}
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.Test;
import org.xml.sax.SAXException;
import java.io.IOException;
import static java.util.Collections.singletonList;
public class SparkBeanEncoderTest {
public static class TestBean2 {
private String name;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public String getName2() {
return name.toLowerCase();
}
}
@Test
public void testCreateDatasetFromBeanFailure() throws IOException,
SAXException {
SparkSession spark = SparkSession
.builder()
.master("local")
.getOrCreate();
TestBean2 bean = new TestBean2();
bean.setName("testing123");
Encoder<TestBean2> encoder = Encoders.bean(TestBean2.class);
Dataset<TestBean2> dataset = spark.createDataset(singletonList(bean),
encoder);
dataset.show();
spark.stop();
}
}
{code}
Running the above produces the following output:
{code}
16/11/27 14:00:04 INFO SparkContext: Running Spark version 2.0.2
16/11/27 14:00:04 WARN NativeCodeLoader: Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
16/11/27 14:00:04 WARN Utils: Your hostname, XXXX resolves to a loopback
address: 127.0.1.1; using 192.168.1.68 instead (on interface eth0)
16/11/27 14:00:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another
address
16/11/27 14:00:04 INFO SecurityManager: Changing view acls to: XXXX
16/11/27 14:00:04 INFO SecurityManager: Changing modify acls to: XXXX
16/11/27 14:00:04 INFO SecurityManager: Changing view acls groups to:
16/11/27 14:00:04 INFO SecurityManager: Changing modify acls groups to:
16/11/27 14:00:04 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(XXXX); groups
with view permissions: Set(); users with modify permissions: Set(XXXX); groups
with modify permissions: Set()
16/11/27 14:00:05 INFO Utils: Successfully started service 'sparkDriver' on
port 34688.
16/11/27 14:00:05 INFO SparkEnv: Registering MapOutputTracker
16/11/27 14:00:05 INFO SparkEnv: Registering BlockManagerMaster
16/11/27 14:00:05 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-0ae3a00f-eb46-4be2-8ece-1873f3db1a29
16/11/27 14:00:05 INFO MemoryStore: MemoryStore started with capacity 3.0 GB
16/11/27 14:00:05 INFO SparkEnv: Registering OutputCommitCoordinator
16/11/27 14:00:05 INFO Utils: Successfully started service 'SparkUI' on port
4040.
16/11/27 14:00:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://192.168.1.68:4040
16/11/27 14:00:05 INFO Executor: Starting executor ID driver on host localhost
16/11/27 14:00:05 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 42688.
16/11/27 14:00:05 INFO NettyBlockTransferService: Server created on
192.168.1.68:42688
16/11/27 14:00:05 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, 192.168.1.68, 42688)
16/11/27 14:00:05 INFO BlockManagerMasterEndpoint: Registering block manager
192.168.1.68:42688 with 3.0 GB RAM, BlockManagerId(driver, 192.168.1.68, 42688)
16/11/27 14:00:05 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, 192.168.1.68, 42688)
16/11/27 14:00:05 WARN SparkContext: Use an existing SparkContext, some
configuration may not take effect.
16/11/27 14:00:05 INFO SharedState: Warehouse path is
'file:/home/hamish/git/language-identifier/wikidump/spark-warehouse'.
16/11/27 14:00:05 INFO CodeGenerator: Code generated in 166.762154 ms
16/11/27 14:00:06 INFO CodeGenerator: Code generated in 6.144958 ms
java.lang.AssertionError: index (1) should < 1
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:133)
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:352)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38)
at
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.spark.sql.execution.LocalTableScanExec.<init>(LocalTableScanExec.scala:38)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:393)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61)
at
org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47)
at
org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51)
at
org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
at
org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
at
org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
at
SparkBeanEncoderTest.testCreateDatasetFromBeanFailure(SparkBeanEncoderTest.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
16/11/27 14:00:06 INFO SparkContext: Invoking stop() from shutdown hook
16/11/27 14:00:06 INFO SparkUI: Stopped Spark web UI at http://192.168.1.68:4040
16/11/27 14:00:06 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
16/11/27 14:00:06 INFO MemoryStore: MemoryStore cleared
16/11/27 14:00:06 INFO BlockManager: BlockManager stopped
16/11/27 14:00:06 INFO BlockManagerMaster: BlockManagerMaster stopped
16/11/27 14:00:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
16/11/27 14:00:06 INFO SparkContext: Successfully stopped SparkContext
16/11/27 14:00:06 INFO ShutdownHookManager: Shutdown hook called
16/11/27 14:00:06 INFO ShutdownHookManager: Deleting directory
/tmp/spark-bad08a28-51bb-4295-a1e3-691d4679a56c
{code}
The problem seems to be caused by an inconsistency in the way bean properties
are inspected in {{org.apache.spark.sql.catalyst.JavaTypeInference}}; sometimes
filtered by the existence of accessors and mutators, sometimes not. This
inconsistency percolates back to the
{{org.apache.spark.sql.catalyst.encoders.ExpressionEncoder}}, where the
serializer has a different field count from the schema.
Desired behaviour here is debatable, but I'm pretty sure AssertionErrors are
always a bug. One simple fix would be to introduce a check so it fails faster,
and with a more helpful message. Personally, I'd quite like it just work, even
when there are too many accessors. To that end I've written a fix,
which I shall PR shortly.
> Encoding a Java Bean with extra accessors, produces inconsistent Dataset,
> resulting in AssertionError
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-18598
> URL: https://issues.apache.org/jira/browse/SPARK-18598
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.2
> Reporter: Hamish Morgan
> Priority: Minor
>
> Most operations of {{org.apache.spark.sql.Dataset}} throw
> {{java.lang.AssertionError}} when the {{Dataset}} was created with an Java
> bean {{Encoder}}, where the bean has more accessors than properties.
> The following until test demonstrates the steps to replicate:
> {code}
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Encoder;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.SparkSession;
> import org.junit.Test;
> import org.xml.sax.SAXException;
> import java.io.IOException;
> import static java.util.Collections.singletonList;
> public class SparkBeanEncoderTest {
> public static class TestBean2 {
> private String name;
> public void setName(String name) {
> this.name = name;
> }
> public String getName() {
> return name;
> }
> public String getName2() {
> return name.toLowerCase();
> }
> }
> @Test
> public void testCreateDatasetFromBeanFailure() throws IOException,
> SAXException {
> SparkSession spark = SparkSession
> .builder()
> .master("local")
> .getOrCreate();
> TestBean2 bean = new TestBean2();
> bean.setName("testing123");
> Encoder<TestBean2> encoder = Encoders.bean(TestBean2.class);
> Dataset<TestBean2> dataset = spark.createDataset(singletonList(bean),
> encoder);
> dataset.show();
> spark.stop();
> }
> }
> {code}
> Running the above produces the following output:
> {code}
> 16/11/27 14:00:04 INFO SparkContext: Running Spark version 2.0.2
> 16/11/27 14:00:04 WARN NativeCodeLoader: Unable to load native-hadoop library
> for your platform... using builtin-java classes where applicable
> 16/11/27 14:00:04 WARN Utils: Your hostname, XXXX resolves to a loopback
> address: 127.0.1.1; using 192.168.1.68 instead (on interface eth0)
> 16/11/27 14:00:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 16/11/27 14:00:04 INFO SecurityManager: Changing view acls to: XXXX
> 16/11/27 14:00:04 INFO SecurityManager: Changing modify acls to: XXXX
> 16/11/27 14:00:04 INFO SecurityManager: Changing view acls groups to:
> 16/11/27 14:00:04 INFO SecurityManager: Changing modify acls groups to:
> 16/11/27 14:00:04 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(XXXX); groups
> with view permissions: Set(); users with modify permissions: Set(XXXX);
> groups with modify permissions: Set()
> 16/11/27 14:00:05 INFO Utils: Successfully started service 'sparkDriver' on
> port 34688.
> 16/11/27 14:00:05 INFO SparkEnv: Registering MapOutputTracker
> 16/11/27 14:00:05 INFO SparkEnv: Registering BlockManagerMaster
> 16/11/27 14:00:05 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-0ae3a00f-eb46-4be2-8ece-1873f3db1a29
> 16/11/27 14:00:05 INFO MemoryStore: MemoryStore started with capacity 3.0 GB
> 16/11/27 14:00:05 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/11/27 14:00:05 INFO Utils: Successfully started service 'SparkUI' on port
> 4040.
> 16/11/27 14:00:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
> http://192.168.1.68:4040
> 16/11/27 14:00:05 INFO Executor: Starting executor ID driver on host localhost
> 16/11/27 14:00:05 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42688.
> 16/11/27 14:00:05 INFO NettyBlockTransferService: Server created on
> 192.168.1.68:42688
> 16/11/27 14:00:05 INFO BlockManagerMaster: Registering BlockManager
> BlockManagerId(driver, 192.168.1.68, 42688)
> 16/11/27 14:00:05 INFO BlockManagerMasterEndpoint: Registering block manager
> 192.168.1.68:42688 with 3.0 GB RAM, BlockManagerId(driver, 192.168.1.68,
> 42688)
> 16/11/27 14:00:05 INFO BlockManagerMaster: Registered BlockManager
> BlockManagerId(driver, 192.168.1.68, 42688)
> 16/11/27 14:00:05 WARN SparkContext: Use an existing SparkContext, some
> configuration may not take effect.
> 16/11/27 14:00:05 INFO SharedState: Warehouse path is
> 'file:/home/hamish/git/language-identifier/wikidump/spark-warehouse'.
> 16/11/27 14:00:05 INFO CodeGenerator: Code generated in 166.762154 ms
> 16/11/27 14:00:06 INFO CodeGenerator: Code generated in 6.144958 ms
> java.lang.AssertionError: index (1) should < 1
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:133)
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:352)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
> Source)
> at
> org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38)
> at
> org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.spark.sql.execution.LocalTableScanExec.<init>(LocalTableScanExec.scala:38)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:393)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61)
> at
> org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47)
> at
> org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51)
> at
> org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
> at
> org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
> at
> org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
> at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572)
> at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
> at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
> at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
> at
> SparkBeanEncoderTest.testCreateDatasetFromBeanFailure(SparkBeanEncoderTest.java:47)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
> at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
> 16/11/27 14:00:06 INFO SparkContext: Invoking stop() from shutdown hook
> 16/11/27 14:00:06 INFO SparkUI: Stopped Spark web UI at
> http://192.168.1.68:4040
> 16/11/27 14:00:06 INFO MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 16/11/27 14:00:06 INFO MemoryStore: MemoryStore cleared
> 16/11/27 14:00:06 INFO BlockManager: BlockManager stopped
> 16/11/27 14:00:06 INFO BlockManagerMaster: BlockManagerMaster stopped
> 16/11/27 14:00:06 INFO
> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
> OutputCommitCoordinator stopped!
> 16/11/27 14:00:06 INFO SparkContext: Successfully stopped SparkContext
> 16/11/27 14:00:06 INFO ShutdownHookManager: Shutdown hook called
> 16/11/27 14:00:06 INFO ShutdownHookManager: Deleting directory
> /tmp/spark-bad08a28-51bb-4295-a1e3-691d4679a56c
> {code}
> The problem seems to be caused by an inconsistency in the way bean properties
> are inspected in {{org.apache.spark.sql.catalyst.JavaTypeInference}};
> sometimes filtered by the existence of accessors and mutators, sometimes not.
> This inconsistency percolates back to the
> {{org.apache.spark.sql.catalyst.encoders.ExpressionEncoder}}, where the
> serializer has a different field count from the schema.
> Desired behaviour here is debatable, but I'm pretty sure AssertionErrors are
> always a bug. One simple fix would be to introduce a check so it fails
> faster, and with a more helpful message. Personally, I'd quite like it just
> work, even when there are too many accessors. To that end I've written a fix,
> which I shall PR shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]