[ https://issues.apache.org/jira/browse/FLINK-29480 ]
Salva deleted comment on FLINK-29480:
-------------------------------
was (Author: JIRAUSER287051):
[~martijnvisser] This is the error I'm getting when I run `mvn -DskipTests
clean package -e`:
{code:java}
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (shade-flink) on
project flink-table-planner_2.12: Error creating shaded jar: Problem shading
JAR
/Users/salvalcantara/Projects/transformations/flink/flink-table/flink-table-planner/target/flink-table-planner_2.12-1.17-SNAPSHOT.jar
entry
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class:
org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: 19
-> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal
org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (shade-flink) on
project flink-table-planner_2.12: Error creating shaded jar: Problem shading
JAR
/Users/salvalcantara/Projects/transformations/flink/flink-table/flink-table-planner/target/flink-table-planner_2.12-1.17-SNAPSHOT.jar
entry
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class:
org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:215)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:156)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:148)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:117)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:81)
at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
(SingleThreadedBuilder.java:56)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:128)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
(Launcher.java:282)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
(Launcher.java:406)
at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347)
Caused by: org.apache.maven.plugin.MojoExecutionException: Error creating
shaded jar: Problem shading JAR
/Users/salvalcantara/Projects/transformations/flink/flink-table/flink-table-planner/target/flink-table-planner_2.12-1.17-SNAPSHOT.jar
entry
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class:
org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class
at org.apache.maven.plugins.shade.mojo.ShadeMojo.execute
(ShadeMojo.java:546)
at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
(DefaultBuildPluginManager.java:137)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:210)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:156)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:148)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:117)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:81)
at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
(SingleThreadedBuilder.java:56)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:128)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
(Launcher.java:282)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
(Launcher.java:406)
at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347)
Caused by: java.io.IOException: Problem shading JAR
/Users/salvalcantara/Projects/transformations/flink/flink-table/flink-table-planner/target/flink-table-planner_2.12-1.17-SNAPSHOT.jar
entry
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class:
org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class
at org.apache.maven.plugins.shade.DefaultShader.shadeJars
(DefaultShader.java:197)
at org.apache.maven.plugins.shade.DefaultShader.shade
(DefaultShader.java:106)
at org.apache.maven.plugins.shade.mojo.ShadeMojo.execute
(ShadeMojo.java:442)
at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
(DefaultBuildPluginManager.java:137)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:210)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:156)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:148)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:117)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:81)
at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
(SingleThreadedBuilder.java:56)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:128)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
(Launcher.java:282)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
(Launcher.java:406)
at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347)
Caused by: org.apache.maven.plugin.MojoExecutionException: Error in ASM
processing class
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class
at org.apache.maven.plugins.shade.DefaultShader.addRemappedClass
(DefaultShader.java:470)
at org.apache.maven.plugins.shade.DefaultShader.shadeSingleJar
(DefaultShader.java:237)
at org.apache.maven.plugins.shade.DefaultShader.shadeJars
(DefaultShader.java:192)
at org.apache.maven.plugins.shade.DefaultShader.shade
(DefaultShader.java:106)
at org.apache.maven.plugins.shade.mojo.ShadeMojo.execute
(ShadeMojo.java:442)
at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
(DefaultBuildPluginManager.java:137)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:210)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:156)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:148)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:117)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:81)
at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
(SingleThreadedBuilder.java:56)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:128)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
(Launcher.java:282)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
(Launcher.java:406)
at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 19
at org.objectweb.asm.ClassReader.readLabel (ClassReader.java:2355)
at org.objectweb.asm.ClassReader.createLabel (ClassReader.java:2373)
at org.objectweb.asm.ClassReader.readTypeAnnotations (ClassReader.java:1781)
at org.objectweb.asm.ClassReader.readCode (ClassReader.java:1293)
at org.objectweb.asm.ClassReader.readMethod (ClassReader.java:1126)
at org.objectweb.asm.ClassReader.accept (ClassReader.java:698)
at org.objectweb.asm.ClassReader.accept (ClassReader.java:500)
at org.apache.maven.plugins.shade.DefaultShader.addRemappedClass
(DefaultShader.java:466)
at org.apache.maven.plugins.shade.DefaultShader.shadeSingleJar
(DefaultShader.java:237)
at org.apache.maven.plugins.shade.DefaultShader.shadeJars
(DefaultShader.java:192)
at org.apache.maven.plugins.shade.DefaultShader.shade
(DefaultShader.java:106)
at org.apache.maven.plugins.shade.mojo.ShadeMojo.execute
(ShadeMojo.java:442)
at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo
(DefaultBuildPluginManager.java:137)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:210)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:156)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute
(MojoExecutor.java:148)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:117)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject
(LifecycleModuleBuilder.java:81)
at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
(SingleThreadedBuilder.java:56)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
(LifecycleStarter.java:128)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced
(Launcher.java:282)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch
(Launcher.java:225)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode
(Launcher.java:406)
at org.codehaus.plexus.classworlds.launcher.Launcher.main
(Launcher.java:347)
[ERROR]
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please
read the following articles:
[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn <args> -rf :flink-table-planner_2.12
{code}
> Skip invalid messages when writing
> ----------------------------------
>
> Key: FLINK-29480
> URL: https://issues.apache.org/jira/browse/FLINK-29480
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Salva
> Assignee: Salva
> Priority: Minor
> Labels: pull-request-available
> Attachments: Screenshot 2022-10-28 at 13.48.12.png
>
>
> As reported in [1], it seems that it's not possible to skip invalid messages
> when writing. More specifically, if there is an error serializing messages,
> there is no option for skipping them and then Flink job enters a crash loop.
> In particular, the `write` method of the `KafkaWriter` looks like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
> final ProducerRecord<byte[], byte[]> record =
> recordSerializer.serialize(element, ...);
> currentProducer.send(record, deliveryCallback); // line 200
> numRecordsSendCounter.inc();
> } {code}
> So, If you make your `serialize` method return `null`, this is what you get
> at runtime
> {code:java}
> java.lang.NullPointerException at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> {code}
> What I propose is to modify the KafkaWriter [2, 3] like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
> final ProducerRecord<byte[], byte[]> record =
> recordSerializer.serialize(element, ...);
> if (record != null) { // skip null records (check to be added)
> currentProducer.send(record, deliveryCallback);
> numRecordsSendCounter.inc();
> }
> } {code}
> In order to at least give a chance of skipping those messages and move on to
> the next ones.
> Obviously, one could prepend the sink with a flatMap operator for filtering
> out invalid messages, but
> # It looks weird that one has to prepend an operator for "making sure" that
> the serializer will not fail right after. Wouldn't it be simpler to skip the
> null records directly in order to avoid this pre-check? [4]
> # It's such a simple change (apparently)
> # Brings consistency/symmetry with the reading case [4, 5]
> To expand on point 3, by looking at `KafkaDeserializationSchema`:
> {code:java}
> T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
> default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T>
> out) throws Exception {
> T deserialized = deserialize(message);
> if (deserialized != null) { // skip null records (check already exists)
> out.collect(deserialized);
> }
> } {code}
> one can simply return `null` in the overriden `deserialize` method in order
> to skip any message that fails to be deserialized. Similarly, if one uses the
> `KafkaRecordDeserializationSchema` interface instead:
> {code:java}
> void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out)
> throws IOException {code}
> then it's also possible not to invoke `out.collect(...)` on null records. To
> me, it looks strange that the same flexibility is not given in the writing
> case.
> *References*
> [1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]
> [2]
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143]
>
> [3]
> [https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197]
>
> [4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d]
> [5]
> [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)