tzulitai commented on a change in pull request #7580: [FLINK-11436] Fix Java 
deserialization failure of the AvroSerializer
URL: https://github.com/apache/flink/pull/7580#discussion_r251268218
 
 

 ##########
 File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ##########
 @@ -381,4 +383,76 @@ public AvroSchemaSerializerConfigSnapshot() {
                }
 
        }
+
+       // -------- backwards compatibility with 1.6 -----------
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               /*
+               During the release of Flink 1.7, the value of serialVersionUID 
was uptick to 2L (was 1L before)
+               And although the AvroSerializer (along with it's snapshot 
class) were migrated to the new serialization
+               abstraction (hence free from Java serialization), there were 
composite serializers that were not migrated
+               and were serialized with Java serialization. In case that one 
of the nested serializers were Avro we would
+               bump into deserialization exception due to a wrong 
serialVersionUID. Unfortunately it is not possible to revert
+               the serialVersionUID back to 1L, because users might have 
snapshots with 2L present already.
+               To overcome this we first need to make sure that the 
AvroSerializer is being Java deserialized with
+               FailureTolerantObjectInputStream, and then we determine the 
serialized layout by looking at the fields.
+
+               From: 
https://docs.oracle.com/javase/8/docs/platform/serialization/spec/class.html#a5421
+               
-------------------------------------------------------------------------------------------------------------
+               The descriptors for primitive typed fields are written first
+               sorted by field name followed by descriptors for the object 
typed fields sorted by field name.
+               The names are sorted using String.compareTo.
+               
-------------------------------------------------------------------------------------------------------------
+
+               pre 1.7         field order:    [schemaString,          type]
+               post 1.7        field order:    [previousSchema,        schema, 
        type]
+
+               We would use the first field to distinguish between the two 
different layouts.
+               To complicate things even further in pre 1.7, the field 
@schemaString could be
+               null or a string, but, in post 1.7, the field @previousSchema 
was never set to null, therefore
+               we can use the first field to determine the version.
+
+               this logic should stay here as long as we support Flink 1.6 
(along with Java serialized
+               TypeSerializers)
+               */
+               final Object firstField = in.readObject();
+
+               if (firstField == null) {
+                       // first field can only be NULL in the old layout.
+                       readOldLayout(null, in);
+               }
+               else if (firstField instanceof String) {
+                       // first field is a String only in the old layout
+                       readOldLayout((String) firstField, in);
+               }
+               else if (firstField instanceof SerializableAvroSchema) {
+                       readCurrentLayout((SerializableAvroSchema) firstField, 
in);
+               }
+               else {
+                       throw new IllegalStateException("Failed to 
Java-Deserialize an AvroSerializer instance. " +
+                               "Was expecting a first field to be either a 
String or SerializableAvroSchema, but got: " +
+                               "" + firstField.getClass());
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readOldLayout(@Nullable String schemaString, 
ObjectInputStream in)
+                       throws IOException, ClassNotFoundException {
+
+               Schema schema = AvroFactory.parseSchemaString(schemaString);
+               Class<T> type = (Class<T>) in.readObject();
+
+               this.previousSchema = new SerializableAvroSchema();
+               this.schema = new SerializableAvroSchema(schema);
+               this.type = type;
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readCurrentLayout(SerializableAvroSchema previousSchema, 
ObjectInputStream in)
+                       throws IOException, ClassNotFoundException {
+
+               this.previousSchema = previousSchema;
+               this.schema = (SerializableAvroSchema) in.readObject();
+               this.type =  (Class<T>) in.readObject();
 
 Review comment:
   nit: extra spaces after `=`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to