Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/5995#discussion_r189195014
--- Diff:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using
{@link SchemaCoder}.
+ *
+ * @param <T> type of record it produces
+ */
+public class RegistryAvroDeserializationSchema<T> extends
AvroDeserializationSchema<T> {
+
+ private static final long serialVersionUID = -884738268437806062L;
+
+ /** Provider for schema coder. Used for initializing in each task. */
+ private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+
+ /** Coder used for reading schema from incoming stream. */
+ private transient SchemaCoder schemaCoder;
+
+ /**
+ * Creates Avro deserialization schema that reads schema from input
stream using provided {@link SchemaCoder}.
+ *
+ * @param recordClazz class to which deserialize. Should be
either
+ * {@link SpecificRecord} or {@link
GenericRecord}.
+ * @param reader reader's Avro schema. Should be provided
if recordClazz is
+ * {@link GenericRecord}
+ * @param schemaCoderProvider schema provider that allows instantiation
of {@link SchemaCoder} that will be used for
+ * schema reading
+ */
+ protected RegistryAvroDeserializationSchema(Class<T> recordClazz,
@Nullable Schema reader,
+ SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+ super(recordClazz, reader);
+ this.schemaCoderProvider = schemaCoderProvider;
+ this.schemaCoder = schemaCoderProvider.get();
+ }
+
+ @Override
+ public T deserialize(byte[] message) {
+ // read record
+ try {
+ checkAvroInitialized();
+ getInputStream().setBuffer(message);
+ Schema writerSchema =
schemaCoder.readSchema(getInputStream());
+ Schema readerSchema = getReaderSchema();
+
+ GenericDatumReader<T> datumReader = getDatumReader();
+
+ datumReader.setSchema(writerSchema);
+ datumReader.setExpected(readerSchema);
+
+ return datumReader.read(null, getDecoder());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to deserialize
Row.", e);
--- End diff --
The method `deserialize()` can throw an IOException. That got dropped from
the signature, and exceptions are not wrapped into a RuntimeException. That
makes exception stack traces more complicated, and hides the fact that "there
is a possible exceptional case to handle" from the consumers of that code.
I think that this makes a general rule: Whenever using `RutimeException`,
take a step back and look at the exception structure and signatures, and see if
something is not declared well.
---