This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 723c004dcdf82073568dc1a758d8d7453a6af954 Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Sep 5 10:13:05 2019 +0200 Add try catch around reflexion call in lazy init of beam coder --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 56097b7..05595f1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -388,12 +388,16 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED + try { v1 = coderClass.class.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } */ List<String> parts = new ArrayList<>(); - parts.add(""); + parts.add("try {"); parts.add(" = "); - parts.add(".class.getDeclaredConstructor().newInstance();"); + parts.add(".class.getDeclaredConstructor().newInstance();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List<Object> args = new ArrayList<>(); args.add(v1);
