[
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239785#comment-16239785
]
ASF GitHub Bot commented on FLINK-6022:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/4943#discussion_r148977758
--- Diff:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
---
@@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source)
throws IOException {
return this.reader.read(reuse, this.decoder);
}
+ //
------------------------------------------------------------------------
+ // Copying
+ //
------------------------------------------------------------------------
+
@Override
- public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ public T copy(T from) {
checkAvroInitialized();
+ return avroData.deepCopy(schema, from);
+ }
- if (this.deepCopyInstance == null) {
- this.deepCopyInstance =
InstantiationUtil.instantiate(type, Object.class);
- }
-
- this.decoder.setIn(source);
- this.encoder.setOut(target);
+ @Override
+ public T copy(T from, T reuse) {
+ return copy(from);
+ }
- T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
- this.writer.write(tmp, this.encoder);
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ T value = deserialize(source);
+ serialize(value, target);
}
- private void checkAvroInitialized() {
- if (this.reader == null) {
- this.reader = new ReflectDatumReader<T>(type);
- this.writer = new ReflectDatumWriter<T>(type);
- this.encoder = new DataOutputEncoder();
- this.decoder = new DataInputDecoder();
+ //
------------------------------------------------------------------------
+ // Compatibility and Upgrades
+ //
------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ if (configSnapshot == null) {
+ checkAvroInitialized();
+ configSnapshot = new
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
}
+ return configSnapshot;
}
- private void checkKryoInitialized() {
- if (this.kryo == null) {
- this.kryo = new Kryo();
-
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
new Kryo.DefaultInstantiatorStrategy();
-
instantiatorStrategy.setFallbackInstantiatorStrategy(new
StdInstantiatorStrategy());
- kryo.setInstantiatorStrategy(instantiatorStrategy);
+ @Override
+ @SuppressWarnings("deprecation")
+ public CompatibilityResult<T>
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof
AvroSchemaSerializerConfigSnapshot) {
+ // proper schema snapshot, can do the sophisticated
schema-based compatibility check
+ final String schemaString =
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+ final Schema lastSchema = new
Schema.Parser().parse(schemaString);
- kryo.setAsmEnabled(true);
+ final SchemaPairCompatibility compatibility =
+
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
- KryoUtils.applyRegistrations(kryo,
kryoRegistrations.values());
+ return compatibility.getType() ==
SchemaCompatibilityType.COMPATIBLE ?
+ CompatibilityResult.compatible() :
CompatibilityResult.requiresMigration();
+ }
+ else if (configSnapshot instanceof
AvroSerializerConfigSnapshot) {
+ // old snapshot case, just compare the type
+ // we don't need to restore any Kryo stuff, since Kryo
was never used for persistence,
+ // only for object-to-object copies.
+ final AvroSerializerConfigSnapshot old =
(AvroSerializerConfigSnapshot) configSnapshot;
+ return type.equals(old.getTypeClass()) ?
+ CompatibilityResult.compatible() :
CompatibilityResult.requiresMigration();
+ }
+ else {
+ return CompatibilityResult.requiresMigration();
}
}
- //
--------------------------------------------------------------------------------------------
+ //
------------------------------------------------------------------------
+ // Utilities
+ //
------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializer<T> duplicate() {
+ return new AvroSerializer<>(type);
+ }
@Override
public int hashCode() {
- return 31 * this.type.hashCode() +
this.typeToInstantiate.hashCode();
+ return 42 + type.hashCode();
}
@Override
public boolean equals(Object obj) {
- if (obj instanceof AvroSerializer) {
- @SuppressWarnings("unchecked")
- AvroSerializer<T> avroSerializer = (AvroSerializer<T>)
obj;
-
- return avroSerializer.canEqual(this) &&
- type == avroSerializer.type &&
- typeToInstantiate ==
avroSerializer.typeToInstantiate;
- } else {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj != null && obj.getClass() == AvroSerializer.class)
{
+ final AvroSerializer that = (AvroSerializer) obj;
+ return this.type == that.type;
+ }
+ else {
return false;
}
}
@Override
public boolean canEqual(Object obj) {
- return obj instanceof AvroSerializer;
+ return obj.getClass() == this.getClass();
}
- //
--------------------------------------------------------------------------------------------
- // Serializer configuration snapshotting & compatibility
- //
--------------------------------------------------------------------------------------------
-
@Override
- public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
- return new AvroSerializerConfigSnapshot<>(type,
typeToInstantiate, kryoRegistrations);
+ public String toString() {
+ return getClass().getName() + " (" + getType().getName() + ')';
}
- @SuppressWarnings("unchecked")
- @Override
- public CompatibilityResult<T>
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
- final AvroSerializerConfigSnapshot<T> config =
(AvroSerializerConfigSnapshot<T>) configSnapshot;
+ //
------------------------------------------------------------------------
+ // Initialization
+ //
------------------------------------------------------------------------
+
+ private void checkAvroInitialized() {
+ if (writer == null) {
+ initializeAvro();
+ }
+ }
+
+ private void initializeAvro() {
+ final ClassLoader cl =
Thread.currentThread().getContextClassLoader();
--- End diff --
This is the pattern used by the Kryo serializer for a while rather than
`type.getClass().getClassLoader()`, because `type` could in theory be a
collection class (application class loader) containing the actual type from
another class loader.
Might not be possible to happen for Avro, through, not totally sure...
> Improve support for Avro GenericRecord
> --------------------------------------
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
> Issue Type: Improvement
> Components: Type Serialization System
> Reporter: Robert Metzger
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by
> shipping the schema to each serializer through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but
> the performance will be much better.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)