[
https://issues.apache.org/jira/browse/HUDI-5842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonathan Vexler updated HUDI-5842:
----------------------------------
Description:
In TestJsonKafkaSource If you try to do more than just count the number of
records you get a null pointer exception.
[https://github.com/apache/hudi/blob/812950bc9ead4c28763b907dc5a1840162f35337/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java]
is a permalink to show what schema it fails on because I am updating the tests
to use a different schema for now. You can trigger the exception by adding
{code:java}
for (Row r : fetch2.getBatch().get().collectAsList()) {
for (StructField f : r.schema().fields()) {
System.out.println(f.name() + ":" + r.get(r.fieldIndex(f.name())));
}
} {code}
to the end of TestJsonKafkaSource.testJsonKafkaSourceWithDefaultUpperCap (using
the schema from the permalink)
The exception is
{code:java}
java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collectFromPlan$1.apply(Dataset.scala:3391)
at
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collectFromPlan$1.apply(Dataset.scala:3388)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3388)
at
org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2800)
at
org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2799)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3369)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3368)
at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2799)
at
org.apache.hudi.utilities.sources.TestJsonKafkaSource.testJsonKafkaSourceWithDefaultUpperCap(TestJsonKafkaSource.java:128)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) {code}
The exception will not occur if you get rid of the fields:
{code:java}
{
"name" : "nation",
"type" : "bytes"
}
{
"name" : "current_date",
"type" : {
"type" : "int",
"logicalType" : "date"
}
}
{
"name" : "height",
"type" : {
"type" : "fixed",
"name" : "abc",
"size" : 5,
"logicalType" : "decimal",
"precision" : 10,
"scale": 6
}
} {code}
from the source.avsc, the string in HoodieTestDataGenerator
public static final String EXTRA_TYPE_SCHEMA
and don't set the values in
{code:java}
private void generateExtraSchemaValues(GenericRecord rec){code}
The exception will occur if any combination of those three is added back to
schema and generator.
The exception will also not occur if you change this line
[https://github.com/apache/hudi/blob/812950bc9ead4c28763b907dc5a1840162f35337/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java#L124]
from
{code:java}
r.getBatch().map(rdd ->
source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)) {code}
to
{code:java}
r.getBatch().map(rdd ->
source.getSparkSession().read().json(rdd)).orElse(null)) {code}
was:
In TestJsonKafkaSource If you try to do more than just count the number of
records you get a null pointer exception.
[https://github.com/apache/hudi/blob/812950bc9ead4c28763b907dc5a1840162f35337/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java]
is a permalink to show what schema it fails on because I am updating the tests
to use a different schema for now. You can trigger the exception by adding
{code:java}
for (Row r : fetch2.getBatch().get().collectAsList()) {
for (StructField f : r.schema().fields()) {
System.out.println(f.name() + ":" + r.get(r.fieldIndex(f.name())));
}
} {code}
to the end of TestJsonKafkaSource.testJsonKafkaSourceWithDefaultUpperCap (using
the schema from the permalink)
The exception is
{code:java}
java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collectFromPlan$1.apply(Dataset.scala:3391)
at
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collectFromPlan$1.apply(Dataset.scala:3388)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3388)
at
org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2800)
at
org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2799)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3369)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3368)
at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2799)
at
org.apache.hudi.utilities.sources.TestJsonKafkaSource.testJsonKafkaSourceWithDefaultUpperCap(TestJsonKafkaSource.java:128)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) {code}
The exception will not occur if you get rid of the fields:
{code:java}
{
"name" : "nation",
"type" : "bytes"
}
{
"name" : "current_date",
"type" : {
"type" : "int",
"logicalType" : "date"
}
}
{
"name" : "height",
"type" : {
"type" : "fixed",
"name" : "abc",
"size" : 5,
"logicalType" : "decimal",
"precision" : 10,
"scale": 6
}
} {code}
from the source.avsc, the string in HoodieTestDataGenerator
public static final String EXTRA_TYPE_SCHEMA
and don't set the values in
{code:java}
private void generateExtraSchemaValues(GenericRecord rec){code}
The exception will occur if any combination of those three is added back to
schema and generator.
The exception will also not occur if you change this line
[https://github.com/apache/hudi/blob/812950bc9ead4c28763b907dc5a1840162f35337/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java#L124]
from
{code:java}
r.getBatch().map(rdd ->
source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)) {code}
to
{code:java}
r.getBatch().map(rdd ->
source.getSparkSession().read().json(rdd)).orElse(null)) {code}
> Json to Dataset<Row> conversion might be broken
> -----------------------------------------------
>
> Key: HUDI-5842
> URL: https://issues.apache.org/jira/browse/HUDI-5842
> Project: Apache Hudi
> Issue Type: Bug
> Components: deltastreamer
> Reporter: Jonathan Vexler
> Priority: Major
>
> In TestJsonKafkaSource If you try to do more than just count the number of
> records you get a null pointer exception.
> [https://github.com/apache/hudi/blob/812950bc9ead4c28763b907dc5a1840162f35337/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java]
> is a permalink to show what schema it fails on because I am updating the
> tests to use a different schema for now. You can trigger the exception by
> adding
> {code:java}
> for (Row r : fetch2.getBatch().get().collectAsList()) {
> for (StructField f : r.schema().fields()) {
> System.out.println(f.name() + ":" + r.get(r.fieldIndex(f.name())));
> }
> } {code}
> to the end of TestJsonKafkaSource.testJsonKafkaSourceWithDefaultUpperCap
> (using the schema from the permalink)
> The exception is
> {code:java}
> java.lang.NullPointerException
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.createExternalRow_0_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
> Source)
> at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collectFromPlan$1.apply(Dataset.scala:3391)
> at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collectFromPlan$1.apply(Dataset.scala:3388)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3388)
> at
> org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2800)
> at
> org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2799)
> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3369)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3368)
> at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2799)
> at
> org.apache.hudi.utilities.sources.TestJsonKafkaSource.testJsonKafkaSourceWithDefaultUpperCap(TestJsonKafkaSource.java:128)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
> at
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
> at
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
> at
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
> at
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
> at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
> at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
> at
> com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
> at
> com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) {code}
>
> The exception will not occur if you get rid of the fields:
> {code:java}
> {
> "name" : "nation",
> "type" : "bytes"
> }
> {
> "name" : "current_date",
> "type" : {
> "type" : "int",
> "logicalType" : "date"
> }
> }
> {
> "name" : "height",
> "type" : {
> "type" : "fixed",
> "name" : "abc",
> "size" : 5,
> "logicalType" : "decimal",
> "precision" : 10,
> "scale": 6
> }
> } {code}
> from the source.avsc, the string in HoodieTestDataGenerator
> public static final String EXTRA_TYPE_SCHEMA
> and don't set the values in
> {code:java}
> private void generateExtraSchemaValues(GenericRecord rec){code}
> The exception will occur if any combination of those three is added back to
> schema and generator.
>
>
> The exception will also not occur if you change this line
> [https://github.com/apache/hudi/blob/812950bc9ead4c28763b907dc5a1840162f35337/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java#L124]
> from
> {code:java}
> r.getBatch().map(rdd ->
> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null))
> {code}
> to
> {code:java}
> r.getBatch().map(rdd ->
> source.getSparkSession().read().json(rdd)).orElse(null)) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)