[
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238935#comment-16238935
]
ASF GitHub Bot commented on FLINK-6022:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4943#discussion_r148928504
--- Diff:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source)
throws IOException {
return this.reader.read(reuse, this.decoder);
}
+ //
------------------------------------------------------------------------
+ // Copying
+ //
------------------------------------------------------------------------
+
@Override
- public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ public T copy(T from) {
checkAvroInitialized();
+ return avroData.deepCopy(schema, from);
+ }
- if (this.deepCopyInstance == null) {
- this.deepCopyInstance =
InstantiationUtil.instantiate(type, Object.class);
- }
-
- this.decoder.setIn(source);
- this.encoder.setOut(target);
+ @Override
+ public T copy(T from, T reuse) {
+ return copy(from);
+ }
- T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
- this.writer.write(tmp, this.encoder);
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ T value = deserialize(source);
+ serialize(value, target);
}
- private void checkAvroInitialized() {
- if (this.reader == null) {
- this.reader = new ReflectDatumReader<T>(type);
- this.writer = new ReflectDatumWriter<T>(type);
- this.encoder = new DataOutputEncoder();
- this.decoder = new DataInputDecoder();
+ //
------------------------------------------------------------------------
+ // Compatibility and Upgrades
+ //
------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ if (configSnapshot == null) {
+ checkAvroInitialized();
+ configSnapshot = new
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+ return configSnapshot;
}
- private void checkKryoInitialized() {
- if (this.kryo == null) {
- this.kryo = new Kryo();
-
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
new Kryo.DefaultInstantiatorStrategy();
-
instantiatorStrategy.setFallbackInstantiatorStrategy(new
StdInstantiatorStrategy());
- kryo.setInstantiatorStrategy(instantiatorStrategy);
+ @Override
+ @SuppressWarnings("deprecation")
+ public CompatibilityResult<T>
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof
AvroSchemaSerializerConfigSnapshot) {
+ // proper schema snapshot, can do the sophisticated
schema-based compatibility check
+ final String schemaString =
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+ final Schema lastSchema = new
Schema.Parser().parse(schemaString);
- kryo.setAsmEnabled(true);
+ final SchemaPairCompatibility compatibility =
+
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
- KryoUtils.applyRegistrations(kryo,
kryoRegistrations.values());
+ return compatibility.getType() ==
SchemaCompatibilityType.COMPATIBLE ?
+ CompatibilityResult.compatible() :
CompatibilityResult.requiresMigration();
+ }
+ else if (configSnapshot instanceof
AvroSerializerConfigSnapshot) {
+ // old snapshot case, just compare the type
+ // we don't need to restore any Kryo stuff, since Kryo
was never used for persistence,
+ // only for object-to-object copies.
+ final AvroSerializerConfigSnapshot old =
(AvroSerializerConfigSnapshot) configSnapshot;
+ return type.equals(old.getTypeClass()) ?
+ CompatibilityResult.compatible() :
CompatibilityResult.requiresMigration();
+ }
+ else {
+ return CompatibilityResult.requiresMigration();
}
}
- //
--------------------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------
+ // Utilities
+ //
------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializer<T> duplicate() {
+ return new AvroSerializer<>(type);
+ }
@Override
public int hashCode() {
- return 31 * this.type.hashCode() +
this.typeToInstantiate.hashCode();
+ return 42 + type.hashCode();
}
@Override
public boolean equals(Object obj) {
- if (obj instanceof AvroSerializer) {
- @SuppressWarnings("unchecked")
- AvroSerializer<T> avroSerializer = (AvroSerializer<T>)
obj;
-
- return avroSerializer.canEqual(this) &&
- type == avroSerializer.type &&
- typeToInstantiate ==
avroSerializer.typeToInstantiate;
- } else {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj != null && obj.getClass() == AvroSerializer.class)
{
+ final AvroSerializer that = (AvroSerializer) obj;
+ return this.type == that.type;
+ }
+ else {
return false;
}
}
@Override
public boolean canEqual(Object obj) {
- return obj instanceof AvroSerializer;
+ return obj.getClass() == this.getClass();
}
- //
--------------------------------------------------------------------------------------------
- // Serializer configuration snapshotting & compatibility
- //
--------------------------------------------------------------------------------------------
-
@Override
- public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
- return new AvroSerializerConfigSnapshot<>(type,
typeToInstantiate, kryoRegistrations);
+ public String toString() {
+ return getClass().getName() + " (" + getType().getName() + ')';
}
- @SuppressWarnings("unchecked")
- @Override
- public CompatibilityResult<T>
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
- final AvroSerializerConfigSnapshot<T> config =
(AvroSerializerConfigSnapshot<T>) configSnapshot;
+ //
------------------------------------------------------------------------
+ // Initialization
+ //
------------------------------------------------------------------------
+
+ private void checkAvroInitialized() {
+ if (writer == null) {
+ initializeAvro();
+ }
+ }
+
+ private void initializeAvro() {
+ final ClassLoader cl =
Thread.currentThread().getContextClassLoader();
+
+ if (SpecificRecord.class.isAssignableFrom(type)) {
+ this.avroData = new SpecificData(cl);
+ this.schema = this.avroData.getSchema(type);
+ this.reader = new SpecificDatumReader<>(schema, schema,
avroData);
+ this.writer = new SpecificDatumWriter<>(schema,
avroData);
+ }
+ else {
+ final ReflectData reflectData = new ReflectData(cl);
+ this.avroData = reflectData;
+ this.schema = this.avroData.getSchema(type);
+ this.reader = new ReflectDatumReader<>(schema, schema,
reflectData);
+ this.writer = new ReflectDatumWriter<>(schema,
reflectData);
+ }
+
+ this.encoder = new DataOutputEncoder();
+ this.decoder = new DataInputDecoder();
+ }
+
+ //
------------------------------------------------------------------------
+ // Serializer Snapshots
+ //
------------------------------------------------------------------------
+
+ /**
+ * A config snapshot for the Avro Serializer that stores the Avro
Schema to check compatibility.
+ */
+ public static final class AvroSchemaSerializerConfigSnapshot extends
TypeSerializerConfigSnapshot {
+
+ private String schemaString;
+
+ /**
+ * Default constructor for instantiation via reflection.
+ */
+ @SuppressWarnings("unused")
+ public AvroSchemaSerializerConfigSnapshot() {}
+
+ public AvroSchemaSerializerConfigSnapshot(String schemaString) {
+ this.schemaString = checkNotNull(schemaString);
+ }
+
+ public String getSchemaString() {
+ return schemaString;
+ }
+
+ // --- Serialization ---
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+ this.schemaString = in.readUTF();
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+ out.writeUTF(schemaString);
+ }
- if (type.equals(config.getTypeClass()) &&
typeToInstantiate.equals(config.getTypeToInstantiate())) {
- // resolve Kryo registrations; currently, since
the Kryo registrations in Avro
- // are fixed, there shouldn't be a problem with
the resolution here.
+ // --- Version ---
- LinkedHashMap<String, KryoRegistration>
oldRegistrations = config.getKryoRegistrations();
- oldRegistrations.putAll(kryoRegistrations);
+ @Override
+ public int getVersion() {
+ return 1;
+ }
- for (Map.Entry<String, KryoRegistration>
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
- if
(reconfiguredRegistrationEntry.getValue().isDummy()) {
- return
CompatibilityResult.requiresMigration();
- }
- }
+ // --- Utils ---
- this.kryoRegistrations = oldRegistrations;
- return CompatibilityResult.compatible();
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj != null && obj.getClass() ==
AvroSchemaSerializerConfigSnapshot.class) {
+ final AvroSchemaSerializerConfigSnapshot that =
(AvroSchemaSerializerConfigSnapshot) obj;
+ return
this.schemaString.equals(that.schemaString);
--- End diff --
Ah forget it, that's probably not necessary here and you're checking for
proper schema compatibility in the compatibility-check method.
> Improve support for Avro GenericRecord
> --------------------------------------
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
> Issue Type: Improvement
> Components: Type Serialization System
> Reporter: Robert Metzger
> Priority: Major
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by
> shipping the schema to each serializer through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but
> the performance will be much better.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)