http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java deleted file mode 100644 index 6f03b12..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.specific.SpecificData; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.util.Utf8; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.List; - -/** - * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. - */ -public class AvroRowSerializationSchema implements SerializationSchema<Row> { - - /** - * Avro record class. - */ - private Class<? extends SpecificRecord> recordClazz; - - /** - * Avro serialization schema. - */ - private transient Schema schema; - - /** - * Writer to serialize Avro record into a byte array. - */ - private transient DatumWriter<GenericRecord> datumWriter; - - /** - * Output stream to serialize records into byte array. - */ - private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); - - /** - * Low-level class for serialization of Avro values. - */ - private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); - - /** - * Creates a Avro serialization schema for the given schema. - * - * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row - */ - public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { - Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); - this.recordClazz = recordClazz; - this.schema = SpecificData.get().getSchema(recordClazz); - this.datumWriter = new SpecificDatumWriter<>(schema); - } - - @Override - @SuppressWarnings("unchecked") - public byte[] serialize(Row row) { - // convert to record - final Object record = convertToRecord(schema, row); - - // write - try { - arrayOutputStream.reset(); - datumWriter.write((GenericRecord) record, encoder); - encoder.flush(); - return arrayOutputStream.toByteArray(); - } catch (IOException e) { - throw new RuntimeException("Failed to serialize Row.", e); - } - } - - private void writeObject(ObjectOutputStream oos) throws IOException { - oos.writeObject(recordClazz); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { - this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject(); - this.schema = SpecificData.get().getSchema(recordClazz); - this.datumWriter = new SpecificDatumWriter<>(schema); - this.arrayOutputStream = new ByteArrayOutputStream(); - this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); - } - - /** - * Converts a (nested) Flink Row into Avro's {@link GenericRecord}. - * Strings are converted into Avro's {@link Utf8} fields. - */ - private static Object convertToRecord(Schema schema, Object rowObj) { - if (rowObj instanceof Row) { - // records can be wrapped in a union - if (schema.getType() == Schema.Type.UNION) { - final List<Schema> types = schema.getTypes(); - if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { - schema = types.get(1); - } - else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) { - schema = types.get(0); - } - else { - throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema); - } - } else if (schema.getType() != Schema.Type.RECORD) { - throw new RuntimeException("Record type for row type expected. But is: " + schema); - } - final List<Schema.Field> fields = schema.getFields(); - final GenericRecord record = new GenericData.Record(schema); - final Row row = (Row) rowObj; - for (int i = 0; i < fields.size(); i++) { - final Schema.Field field = fields.get(i); - record.put(field.pos(), convertToRecord(field.schema(), row.getField(i))); - } - return record; - } else if (rowObj instanceof String) { - return new Utf8((String) rowObj); - } else { - return rowObj; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java deleted file mode 100644 index 28f2ed3..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils; -import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; -import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema; -import org.apache.flink.types.Row; -import org.apache.flink.util.InstantiationUtil; - -import org.apache.avro.specific.SpecificRecord; -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; - -/** - * Test for the Avro serialization and deserialization schema. - */ -public class AvroRowDeSerializationSchemaTest { - - @Test - public void testSerializeDeserializeSimpleRow() throws IOException { - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); - - final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); - final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); - - final byte[] bytes = serializationSchema.serialize(testData.f2); - final Row actual = deserializationSchema.deserialize(bytes); - - assertEquals(testData.f2, actual); - } - - @Test - public void testSerializeSimpleRowSeveralTimes() throws IOException { - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); - - final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); - final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); - - serializationSchema.serialize(testData.f2); - serializationSchema.serialize(testData.f2); - final byte[] bytes = serializationSchema.serialize(testData.f2); - final Row actual = deserializationSchema.deserialize(bytes); - - assertEquals(testData.f2, actual); - } - - @Test - public void testDeserializeRowSeveralTimes() throws IOException { - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); - - final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); - final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); - - final byte[] bytes = serializationSchema.serialize(testData.f2); - deserializationSchema.deserialize(bytes); - deserializationSchema.deserialize(bytes); - final Row actual = deserializationSchema.deserialize(bytes); - - assertEquals(testData.f2, actual); - } - - @Test - public void testSerializeDeserializeComplexRow() throws IOException { - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); - - final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); - final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); - - final byte[] bytes = serializationSchema.serialize(testData.f2); - final Row actual = deserializationSchema.deserialize(bytes); - - assertEquals(testData.f2, actual); - } - - @Test - public void testSerializeComplexRowSeveralTimes() throws IOException { - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); - - final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); - final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); - - serializationSchema.serialize(testData.f2); - serializationSchema.serialize(testData.f2); - final byte[] bytes = serializationSchema.serialize(testData.f2); - final Row actual = deserializationSchema.deserialize(bytes); - - assertEquals(testData.f2, actual); - } - - @Test - public void testDeserializeComplexRowSeveralTimes() throws IOException { - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); - - final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); - final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); - - final byte[] bytes = serializationSchema.serialize(testData.f2); - deserializationSchema.deserialize(bytes); - deserializationSchema.deserialize(bytes); - final Row actual = deserializationSchema.deserialize(bytes); - - assertEquals(testData.f2, actual); - } - - @Test - public void testSerializability() throws IOException, ClassNotFoundException { - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); - - final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0); - final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0); - - byte[] serBytes = InstantiationUtil.serializeObject(serOrig); - byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig); - - AvroRowSerializationSchema serCopy = - InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader()); - AvroRowDeserializationSchema deserCopy = - InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader()); - - final byte[] bytes = serCopy.serialize(testData.f2); - deserCopy.deserialize(bytes); - deserCopy.deserialize(bytes); - final Row actual = deserCopy.deserialize(bytes); - - assertEquals(testData.f2, actual); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java index def16b2..871a6f6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils; +import org.apache.flink.formats.avro.utils.AvroTestUtils; import org.apache.flink.table.api.Types; import org.apache.avro.Schema; http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java deleted file mode 100644 index a41125a..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.types.Row; - -import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.specific.SpecificRecord; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; - -/** - * Utilities for creating Avro Schemas. - */ -public final class AvroTestUtils { - - private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka"; - - /** - * Creates a flat Avro Schema for testing. - */ - public static Schema createFlatAvroSchema(String[] fieldNames, TypeInformation[] fieldTypes) { - final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder - .record("BasicAvroRecord") - .namespace(NAMESPACE) - .fields(); - - final Schema nullSchema = Schema.create(Schema.Type.NULL); - - for (int i = 0; i < fieldNames.length; i++) { - Schema schema = ReflectData.get().getSchema(fieldTypes[i].getTypeClass()); - Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, schema)); - fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault(); - } - - return fieldAssembler.endRecord(); - } - - /** - * Tests a simple Avro data types without nesting. - */ - public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSimpleTestData() { - final Address addr = Address.newBuilder() - .setNum(42) - .setStreet("Main Street 42") - .setCity("Test City") - .setState("Test State") - .setZip("12345") - .build(); - - final Row rowAddr = new Row(5); - rowAddr.setField(0, 42); - rowAddr.setField(1, "Main Street 42"); - rowAddr.setField(2, "Test City"); - rowAddr.setField(3, "Test State"); - rowAddr.setField(4, "12345"); - - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>(); - t.f0 = Address.class; - t.f1 = addr; - t.f2 = rowAddr; - - return t; - } - - /** - * Tests all Avro data types as well as nested types. - */ - public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getComplexTestData() { - final Address addr = Address.newBuilder() - .setNum(42) - .setStreet("Main Street 42") - .setCity("Test City") - .setState("Test State") - .setZip("12345") - .build(); - - final Row rowAddr = new Row(5); - rowAddr.setField(0, 42); - rowAddr.setField(1, "Main Street 42"); - rowAddr.setField(2, "Test City"); - rowAddr.setField(3, "Test State"); - rowAddr.setField(4, "12345"); - - final User user = User.newBuilder() - .setName("Charlie") - .setFavoriteNumber(null) - .setFavoriteColor("blue") - .setTypeLongTest(1337L) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeBoolTest(false) - .setTypeArrayString(new ArrayList<CharSequence>()) - .setTypeArrayBoolean(new ArrayList<Boolean>()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap<CharSequence, Long>()) - .setTypeFixed(null) - .setTypeUnion(null) - .setTypeNested(addr) - .build(); - - final Row rowUser = new Row(15); - rowUser.setField(0, "Charlie"); - rowUser.setField(1, null); - rowUser.setField(2, "blue"); - rowUser.setField(3, 1337L); - rowUser.setField(4, 1.337d); - rowUser.setField(5, null); - rowUser.setField(6, false); - rowUser.setField(7, new ArrayList<CharSequence>()); - rowUser.setField(8, new ArrayList<Boolean>()); - rowUser.setField(9, null); - rowUser.setField(10, Colors.RED); - rowUser.setField(11, new HashMap<CharSequence, Long>()); - rowUser.setField(12, null); - rowUser.setField(13, null); - rowUser.setField(14, rowAddr); - - final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>(); - t.f0 = User.class; - t.f1 = user; - t.f2 = rowUser; - - return t; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 97c9f20..7468b67 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -36,7 +36,6 @@ under the License. <packaging>pom</packaging> <modules> - <module>flink-avro</module> <module>flink-jdbc</module> <module>flink-hadoop-compatibility</module> <module>flink-hbase</module> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index ae3f56e..0ca742c 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -80,12 +80,6 @@ under the License. <!-- managed version --> </dependency> - <!-- Avro is needed for the interoperability with Avro types for serialization --> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - <!-- We explicitly depend on snappy since connectors that require it load it through the system class loader --> <dependency> <groupId>org.xerial.snappy</groupId> @@ -128,7 +122,7 @@ under the License. <scope>test</scope> </dependency> - </dependencies> + </dependencies> <profiles> <profile> @@ -209,6 +203,7 @@ under the License. <exclude>org.apache.flink.core.fs.FileSystem#isFlinkSupportedScheme(java.lang.String)</exclude> <exclude>org.apache.flink.core.fs.FileSystem#setDefaultScheme(org.apache.flink.configuration.Configuration)</exclude> <exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude> + <exclude>org.apache.flink.api.java.typeutils.AvroTypeInfo</exclude> <!-- Breaking changes between 1.1 and 1.2. We ignore these changes because these are low-level, internal runtime configuration parameters --> <exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE</exclude> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index fc66ccd..88d524e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -571,16 +571,24 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut } /** - * Force Flink to use the AvroSerializer for POJOs. + * Forces Flink to use the Apache Avro serializer for POJOs. + * + * <b>Important:</b> Make sure to include the <i>flink-avro</i> module. */ public void enableForceAvro() { forceAvro = true; } + /** + * Disables the Apache Avro serializer as the forced serializer for POJOs. + */ public void disableForceAvro() { forceAvro = false; } + /** + * Returns whether the Apache Avro is the default serializer for POJOs. + */ public boolean isForceAvroEnabled() { return forceAvro; } http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java deleted file mode 100644 index 1356e53..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils; - -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.annotation.Public; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.List; - -/** - * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs) - * - * Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>} - * with a {@code GenericType<avro.Utf8>}. - * All other types used by Avro are standard Java types. - * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime. - * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here - * by generic type infos containing Utf8 classes (which are comparable), - * - * This class is checked by the AvroPojoTest. - * @param <T> - */ -@Public -public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> { - @PublicEvolving - public AvroTypeInfo(Class<T> typeClass) { - super(typeClass, generateFieldsFromAvroSchema(typeClass)); - } - - private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) { - PojoTypeExtractor pte = new PojoTypeExtractor(); - ArrayList<Type> typeHierarchy = new ArrayList<>(); - typeHierarchy.add(typeClass); - TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null); - - if(!(ti instanceof PojoTypeInfo)) { - throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); - } - PojoTypeInfo pti = (PojoTypeInfo) ti; - List<PojoField> newFields = new ArrayList<>(pti.getTotalFields()); - - for(int i = 0; i < pti.getArity(); i++) { - PojoField f = pti.getPojoFieldAt(i); - TypeInformation newType = f.getTypeInformation(); - // check if type is a CharSequence - if(newType instanceof GenericTypeInfo) { - if((newType).getTypeClass().equals(CharSequence.class)) { - // replace the type by a org.apache.avro.util.Utf8 - newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); - } - } - PojoField newField = new PojoField(f.getField(), newType); - newFields.add(newField); - } - return newFields; - } - - private static class PojoTypeExtractor extends TypeExtractor { - private PojoTypeExtractor() { - super(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 8a4fbbe..b24f425 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -27,12 +27,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; import org.apache.flink.api.java.typeutils.runtime.PojoComparator; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; @@ -300,15 +301,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> { @Override @PublicEvolving + @SuppressWarnings("unchecked") public TypeSerializer<T> createSerializer(ExecutionConfig config) { if(config.isForceKryoEnabled()) { - return new KryoSerializer<T>(getTypeClass(), config); + return new KryoSerializer<>(getTypeClass(), config); } + if(config.isForceAvroEnabled()) { - return new AvroSerializer<T>(getTypeClass()); + Class<?> clazz; + try { + clazz = Class.forName("org.apache.flink.formats.avro.typeutils.AvroSerializer"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load the AvroSerializer class. " + + "You may be missing the 'flink-avro' dependency."); + } + + try { + Constructor<?> constructor = clazz.getConstructor(Class.class); + return (TypeSerializer<T>) constructor.newInstance(getTypeClass()); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) { + throw new RuntimeException("Incompatible versions of the Avro classes found."); + } catch (InvocationTargetException e) { + throw new RuntimeException("Cannot create AvroSerializer.", e.getTargetException()); + } } - TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ]; + TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length]; Field[] reflectiveFields = new Field[fields.length]; for (int i = 0; i < fields.length; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java index 41d260d..c5c2565 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java @@ -287,4 +287,39 @@ public class TypeExtractionUtils { ((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>) t2).getName()) && ((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>) t2).getGenericDeclaration()); } + + /** + * Traverses the type hierarchy of a type up until a certain stop class is found. + * + * @param t type for which a hierarchy need to be created + * @return type of the immediate child of the stop class + */ + public static Type getTypeHierarchy(List<Type> typeHierarchy, Type t, Class<?> stopAtClass) { + while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) { + typeHierarchy.add(t); + t = typeToClass(t).getGenericSuperclass(); + + if (t == null) { + break; + } + } + return t; + } + + /** + * Returns true if the given class has a superclass of given name. + * + * @param clazz class to be analyzed + * @param superClassName class name of the super class + */ + public static boolean hasSuperclass(Class<?> clazz, String superClassName) { + List<Type> hierarchy = new ArrayList<>(); + getTypeHierarchy(hierarchy, clazz, Object.class); + for (Type t : hierarchy) { + if (isClassType(t) && typeToClass(t).getName().equals(superClassName)) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index c50dfc9..1a9cecb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -18,7 +18,6 @@ package org.apache.flink.api.java.typeutils; -import org.apache.avro.specific.SpecificRecordBase; import org.apache.commons.lang3.ClassUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; @@ -73,6 +72,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getTypeHierarchy; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda; import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods; @@ -114,6 +115,10 @@ public class TypeExtractor { private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = "org.apache.flink.api.java.typeutils.WritableTypeInfo"; + private static final String AVRO_SPECIFIC_RECORD_BASE_CLASS = "org.apache.avro.specific.SpecificRecordBase"; + + private static final String AVRO_TYPEINFO_CLASS = "org.apache.flink.formats.avro.typeutils.AvroTypeInfo"; + private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); public static final int[] NO_INDEX = new int[] {}; @@ -1583,24 +1588,6 @@ public class TypeExtractor { } /** - * Traverses the type hierarchy of a type up until a certain stop class is found. - * - * @param t type for which a hierarchy need to be created - * @return type of the immediate child of the stop class - */ - private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type t, Class<?> stopAtClass) { - while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) { - typeHierarchy.add(t); - t = typeToClass(t).getGenericSuperclass(); - - if (t == null) { - break; - } - } - return t; - } - - /** * Traverses the type hierarchy up until a type information factory can be found. * * @param typeHierarchy hierarchy to be filled while traversing up @@ -1806,8 +1793,8 @@ public class TypeExtractor { } // special case for POJOs generated by Avro. - if(SpecificRecordBase.class.isAssignableFrom(clazz)) { - return new AvroTypeInfo(clazz); + if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) { + return createAvroTypeInfo(clazz); } if (Modifier.isInterface(clazz.getModifiers())) { @@ -2119,7 +2106,7 @@ public class TypeExtractor { private static boolean hasHadoopWritableInterface(Class<?> clazz, HashSet<Class<?>> alreadySeen) { Class<?>[] interfaces = clazz.getInterfaces(); for (Class<?> c : interfaces) { - if (c.getName().equals("org.apache.hadoop.io.Writable")) { + if (c.getName().equals(HADOOP_WRITABLE_CLASS)) { return true; } else if (alreadySeen.add(c) && hasHadoopWritableInterface(c, alreadySeen)) { @@ -2155,7 +2142,7 @@ public class TypeExtractor { throw new RuntimeException("Incompatible versions of the Hadoop Compatibility classes found."); } catch (InvocationTargetException e) { - throw new RuntimeException("Cannot create Hadoop Writable Type info", e.getTargetException()); + throw new RuntimeException("Cannot create Hadoop WritableTypeInfo.", e.getTargetException()); } } @@ -2171,7 +2158,7 @@ public class TypeExtractor { // this is actually a writable type info // check if the type is a writable if (!(type instanceof Class && isHadoopWritable((Class<?>) type))) { - throw new InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected"); + throw new InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected."); } // check writable type contents @@ -2188,4 +2175,33 @@ public class TypeExtractor { // ignore } } + + // ------------------------------------------------------------------------ + // Utilities to handle Avro's 'SpecificRecord' type via reflection + // ------------------------------------------------------------------------ + + private static <T> TypeInformation<T> createAvroTypeInfo(Class<T> clazz) { + Class<?> typeInfoClass; + try { + typeInfoClass = Class.forName(AVRO_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader()); + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load the TypeInformation for the class '" + + AVRO_TYPEINFO_CLASS + "'. You may be missing the 'flink-avro' dependency."); + } + + try { + Constructor<?> constr = typeInfoClass.getConstructor(Class.class); + + @SuppressWarnings("unchecked") + TypeInformation<T> typeInfo = (TypeInformation<T>) constr.newInstance(clazz); + return typeInfo; + } + catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) { + throw new RuntimeException("Incompatible versions of the Avro classes found."); + } + catch (InvocationTargetException e) { + throw new RuntimeException("Cannot create AvroTypeInfo.", e.getTargetException()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java deleted file mode 100644 index 565bd4d..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; - -import org.apache.avro.generic.GenericData; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.InstantiationUtil; - -import com.esotericsoftware.kryo.Kryo; -import org.apache.flink.util.Preconditions; -import org.objenesis.strategy.StdInstantiatorStrategy; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. - * - * @param <T> The type serialized. - */ -@Internal -public final class AvroSerializer<T> extends TypeSerializer<T> { - - private static final long serialVersionUID = 1L; - - private final Class<T> type; - - private final Class<? extends T> typeToInstantiate; - - /** - * Map of class tag (using classname as tag) to their Kryo registration. - * - * <p>This map serves as a preview of the final registration result of - * the Kryo instance, taking into account registration overwrites. - */ - private LinkedHashMap<String, KryoRegistration> kryoRegistrations; - - private transient ReflectDatumWriter<T> writer; - private transient ReflectDatumReader<T> reader; - - private transient DataOutputEncoder encoder; - private transient DataInputDecoder decoder; - - private transient Kryo kryo; - - private transient T deepCopyInstance; - - // -------------------------------------------------------------------------------------------- - - public AvroSerializer(Class<T> type) { - this(type, type); - } - - public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) { - this.type = checkNotNull(type); - this.typeToInstantiate = checkNotNull(typeToInstantiate); - - InstantiationUtil.checkForInstantiation(typeToInstantiate); - - this.kryoRegistrations = buildKryoRegistrations(type); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public AvroSerializer<T> duplicate() { - return new AvroSerializer<T>(type, typeToInstantiate); - } - - @Override - public T createInstance() { - return InstantiationUtil.instantiate(this.typeToInstantiate); - } - - @Override - public T copy(T from) { - checkKryoInitialized(); - - return KryoUtils.copy(from, kryo, this); - } - - @Override - public T copy(T from, T reuse) { - checkKryoInitialized(); - - return KryoUtils.copy(from, reuse, kryo, this); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(T value, DataOutputView target) throws IOException { - checkAvroInitialized(); - this.encoder.setOut(target); - this.writer.write(value, this.encoder); - } - - @Override - public T deserialize(DataInputView source) throws IOException { - checkAvroInitialized(); - this.decoder.setIn(source); - return this.reader.read(null, this.decoder); - } - - @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - checkAvroInitialized(); - this.decoder.setIn(source); - return this.reader.read(reuse, this.decoder); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - checkAvroInitialized(); - - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); - - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); - } - - - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader<T>(type); - this.writer = new ReflectDatumWriter<T>(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); - } - } - - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); - - kryo.setAsmEnabled(true); - - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); - } - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof AvroSerializer) { - @SuppressWarnings("unchecked") - AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj; - - return avroSerializer.canEqual(this) && - type == avroSerializer.type && - typeToInstantiate == avroSerializer.typeToInstantiate; - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof AvroSerializer; - } - - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - - @Override - public AvroSerializerConfigSnapshot<T> snapshotConfiguration() { - return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); - } - - @SuppressWarnings("unchecked") - @Override - public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof AvroSerializerConfigSnapshot) { - final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot; - - if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) { - // resolve Kryo registrations; currently, since the Kryo registrations in Avro - // are fixed, there shouldn't be a problem with the resolution here. - - LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations(); - oldRegistrations.putAll(kryoRegistrations); - - for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) { - if (reconfiguredRegistrationEntry.getValue().isDummy()) { - return CompatibilityResult.requiresMigration(); - } - } - - this.kryoRegistrations = oldRegistrations; - return CompatibilityResult.compatible(); - } - } - - // ends up here if the preceding serializer is not - // the ValueSerializer, or serialized data type has changed - return CompatibilityResult.requiresMigration(); - } - - public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> { - - private static final int VERSION = 1; - - private Class<? extends T> typeToInstantiate; - - public AvroSerializerConfigSnapshot() {} - - public AvroSerializerConfigSnapshot( - Class<T> baseType, - Class<? extends T> typeToInstantiate, - LinkedHashMap<String, KryoRegistration> kryoRegistrations) { - - super(baseType, kryoRegistrations); - this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); - } - - @Override - public void write(DataOutputView out) throws IOException { - super.write(out); - - out.writeUTF(typeToInstantiate.getName()); - } - - @SuppressWarnings("unchecked") - @Override - public void read(DataInputView in) throws IOException { - super.read(in); - - String classname = in.readUTF(); - try { - typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader()); - } catch (ClassNotFoundException e) { - throw new IOException("Cannot find requested class " + classname + " in classpath.", e); - } - } - - @Override - public int getVersion() { - return VERSION; - } - - public Class<? extends T> getTypeToInstantiate() { - return typeToInstantiate; - } - } - - // -------------------------------------------------------------------------------------------- - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - // kryoRegistrations may be null if this Avro serializer is deserialized from an old version - if (kryoRegistrations == null) { - this.kryoRegistrations = buildKryoRegistrations(type); - } - } - - private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) { - final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>(); - - // register Avro types. - registrations.put( - GenericData.Array.class.getName(), - new KryoRegistration( - GenericData.Array.class, - new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); - registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class)); - registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class)); - registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class)); - registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class)); - - // register the serialized data type - registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType)); - - return registrations; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java deleted file mode 100644 index c0454c6..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.io.Decoder; -import org.apache.avro.util.Utf8; -import org.apache.flink.annotation.Internal; - -@Internal -public class DataInputDecoder extends Decoder implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private transient Utf8 stringDecoder = new Utf8(); - - - private transient DataInput in; - - - public void setIn(DataInput in) { - this.in = in; - } - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void readNull() {} - - - @Override - public boolean readBoolean() throws IOException { - return in.readBoolean(); - } - - @Override - public int readInt() throws IOException { - return in.readInt(); - } - - @Override - public long readLong() throws IOException { - return in.readLong(); - } - - @Override - public float readFloat() throws IOException { - return in.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return in.readDouble(); - } - - @Override - public int readEnum() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void readFixed(byte[] bytes, int start, int length) throws IOException { - in.readFully(bytes, start, length); - } - - @Override - public ByteBuffer readBytes(ByteBuffer old) throws IOException { - int length = readInt(); - ByteBuffer result; - if (old != null && length <= old.capacity() && old.hasArray()) { - result = old; - result.clear(); - } else { - result = ByteBuffer.allocate(length); - } - in.readFully(result.array(), result.arrayOffset() + result.position(), length); - result.limit(length); - return result; - } - - - @Override - public void skipFixed(int length) throws IOException { - skipBytes(length); - } - - @Override - public void skipBytes() throws IOException { - int num = readInt(); - skipBytes(num); - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - - @Override - public Utf8 readString(Utf8 old) throws IOException { - int length = readInt(); - Utf8 result = (old != null ? old : new Utf8()); - result.setByteLength(length); - - if (length > 0) { - in.readFully(result.getBytes(), 0, length); - } - - return result; - } - - @Override - public String readString() throws IOException { - return readString(stringDecoder).toString(); - } - - @Override - public void skipString() throws IOException { - int len = readInt(); - skipBytes(len); - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public long readArrayStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long arrayNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipArray() throws IOException { - return readVarLongCount(in); - } - - @Override - public long readMapStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long mapNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipMap() throws IOException { - return readVarLongCount(in); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public int readIndex() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - private void skipBytes(int num) throws IOException { - while (num > 0) { - num -= in.skipBytes(num); - } - } - - public static long readVarLongCount(DataInput in) throws IOException { - long value = in.readUnsignedByte(); - - if ((value & 0x80) == 0) { - return value; - } - else { - long curr; - int shift = 7; - value = value & 0x7f; - while (((curr = in.readUnsignedByte()) & 0x80) != 0){ - value |= (curr & 0x7f) << shift; - shift += 7; - } - value |= curr << shift; - return value; - } - } - - // -------------------------------------------------------------------------------------------- - // serialization - // -------------------------------------------------------------------------------------------- - - private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { - // Read in size, and any hidden stuff - s.defaultReadObject(); - - this.stringDecoder = new Utf8(); - this.in = null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java deleted file mode 100644 index c41b648..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.io.Encoder; -import org.apache.avro.util.Utf8; -import org.apache.flink.annotation.Internal; - -@Internal -public final class DataOutputEncoder extends Encoder implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private transient DataOutput out; - - - public void setOut(DataOutput out) { - this.out = out; - } - - - @Override - public void flush() throws IOException {} - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void writeNull() {} - - - @Override - public void writeBoolean(boolean b) throws IOException { - out.writeBoolean(b); - } - - @Override - public void writeInt(int n) throws IOException { - out.writeInt(n); - } - - @Override - public void writeLong(long n) throws IOException { - out.writeLong(n); - } - - @Override - public void writeFloat(float f) throws IOException { - out.writeFloat(f); - } - - @Override - public void writeDouble(double d) throws IOException { - out.writeDouble(d); - } - - @Override - public void writeEnum(int e) throws IOException { - out.writeInt(e); - } - - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void writeFixed(byte[] bytes, int start, int len) throws IOException { - out.write(bytes, start, len); - } - - @Override - public void writeBytes(byte[] bytes, int start, int len) throws IOException { - out.writeInt(len); - if (len > 0) { - out.write(bytes, start, len); - } - } - - @Override - public void writeBytes(ByteBuffer bytes) throws IOException { - int num = bytes.remaining(); - out.writeInt(num); - - if (num > 0) { - writeFixed(bytes); - } - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - @Override - public void writeString(String str) throws IOException { - byte[] bytes = Utf8.getBytesFor(str); - writeBytes(bytes, 0, bytes.length); - } - - @Override - public void writeString(Utf8 utf8) throws IOException { - writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); - - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public void writeArrayStart() {} - - @Override - public void setItemCount(long itemCount) throws IOException { - if (itemCount > 0) { - writeVarLongCount(out, itemCount); - } - } - - @Override - public void startItem() {} - - @Override - public void writeArrayEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - @Override - public void writeMapStart() {} - - @Override - public void writeMapEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public void writeIndex(int unionIndex) throws IOException { - out.writeInt(unionIndex); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - - public static final void writeVarLongCount(DataOutput out, long val) throws IOException { - if (val < 0) { - throw new IOException("Illegal count (must be non-negative): " + val); - } - - while ((val & ~0x7FL) != 0) { - out.write(((int) val) | 0x80); - val >>>= 7; - } - out.write((int) val); - } - - private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { - // Read in size, and any hidden stuff - s.defaultReadObject(); - - this.out = null; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index 6730136..269cf35 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -24,8 +24,6 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import org.apache.avro.generic.GenericData; - import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; @@ -406,7 +404,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { } // there's actually no way to tell if new Kryo serializers are compatible with - // the previous ones they overwrite; we can only signal compatibly and hope for the best + // the previous ones they overwrite; we can only signal compatibility and hope for the best this.kryoRegistrations = reconfiguredRegistrations; return CompatibilityResult.compatible(); } @@ -478,11 +476,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> { registeredTypeWithSerializerEntry.getValue())); } - kryoRegistrations.put( - GenericData.Array.class.getName(), - new KryoRegistration( - GenericData.Array.class, - new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); + // add Avro support if flink-avro is available; a dummy otherwise + Serializers.addAvroGenericDataArrayRegistration(kryoRegistrations); return kryoRegistrations; } http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java index 4976d6a..de7b2fc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java @@ -18,16 +18,6 @@ package org.apache.flink.api.java.typeutils.runtime.kryo; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.CollectionSerializer; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.specific.SpecificRecordBase; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -35,18 +25,29 @@ import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.CollectionSerializer; import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.GenericArrayType; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Set; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass; + /** * Class containing utilities for the serializers of the Flink Runtime. @@ -60,6 +61,14 @@ import java.util.Set; @Internal public class Serializers { + private static final String AVRO_SPECIFIC_RECORD_BASE = "org.apache.avro.specific.SpecificRecordBase"; + + private static final String AVRO_GENERIC_RECORD = "org.apache.avro.generic.GenericData$Record"; + + private static final String AVRO_KRYO_UTILS = "org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils"; + + private static final String AVRO_GENERIC_DATA_ARRAY = "org.apache.avro.generic.GenericData$Array"; + public static void recursivelyRegisterType(TypeInformation<?> typeInfo, ExecutionConfig config, Set<Class<?>> alreadySeen) { if (typeInfo instanceof GenericTypeInfo) { GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo; @@ -94,8 +103,11 @@ public class Serializers { } else { config.registerKryoType(type); - checkAndAddSerializerForTypeAvro(config, type); - + // add serializers for Avro type if necessary + if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(type, AVRO_GENERIC_RECORD)) { + addAvroSerializers(config, type); + } + Field[] fields = type.getDeclaredFields(); for (Field field : fields) { if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { @@ -147,20 +159,54 @@ public class Serializers { } } } - - // ------------------------------------------------------------------------ - - private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) { - if (GenericData.Record.class.isAssignableFrom(type) || SpecificRecordBase.class.isAssignableFrom(type)) { - // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type - // because Kryo is not able to serialize them properly, we use this serializer for them - reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class); - - // We register this serializer for users who want to use untyped Avro records (GenericData.Record). - // Kryo is able to serialize everything in there, except for the Schema. - // This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea. - // we add the serializer as a default serializer because Avro is using a private sub-type at runtime. - reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class); + + /** + * Loads the utility class from <code>flink-avro</code> and adds Avro-specific serializers. + */ + private static void addAvroSerializers(ExecutionConfig reg, Class<?> type) { + Class<?> clazz; + try { + clazz = Class.forName(AVRO_KRYO_UTILS, false, Serializers.class.getClassLoader()); + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load class '" + AVRO_KRYO_UTILS + "'. " + + "You may be missing the 'flink-avro' dependency."); + } + try { + clazz.getDeclaredMethod("addAvroSerializers", ExecutionConfig.class, Class.class).invoke(null, reg, type); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("Could not access method in 'flink-avro' dependency.", e); + } + } + + @SuppressWarnings("unchecked") + public static void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) { + try { + Class<?> clazz = Class.forName(AVRO_GENERIC_DATA_ARRAY, false, Serializers.class.getClassLoader()); + + kryoRegistrations.put( + AVRO_GENERIC_DATA_ARRAY, + new KryoRegistration( + clazz, + new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); + } + catch (ClassNotFoundException e) { + kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY, + new KryoRegistration(DummyAvroRegisteredClass.class, (Class) DummyAvroKryoSerializerClass.class)); + } + } + + public static class DummyAvroRegisteredClass {} + + public static class DummyAvroKryoSerializerClass<T> extends Serializer<T> { + @Override + public void write(Kryo kryo, Output output, Object o) { + throw new UnsupportedOperationException("Could not find required Avro dependency."); + } + + @Override + public T read(Kryo kryo, Input input, Class<T> aClass) { + throw new UnsupportedOperationException("Could not find required Avro dependency."); } } @@ -168,6 +214,9 @@ public class Serializers { // Custom Serializers // -------------------------------------------------------------------------------------------- + /** + * Special serializer for Java's {@link ArrayList} used for Avro's GenericData.Array. + */ @SuppressWarnings("rawtypes") public static class SpecificInstanceCollectionSerializerForArrayList extends SpecificInstanceCollectionSerializer<ArrayList> { private static final long serialVersionUID = 1L; @@ -176,19 +225,19 @@ public class Serializers { super(ArrayList.class); } } + /** * Special serializer for Java collections enforcing certain instance types. * Avro is serializing collections with an "GenericData.Array" type. Kryo is not able to handle * this type, so we use ArrayLists. */ @SuppressWarnings("rawtypes") - public static class SpecificInstanceCollectionSerializer<T extends Collection> - extends CollectionSerializer implements Serializable - { + public static class SpecificInstanceCollectionSerializer<T extends Collection> + extends CollectionSerializer implements Serializable { private static final long serialVersionUID = 1L; - + private Class<T> type; - + public SpecificInstanceCollectionSerializer(Class<T> type) { this.type = type; } @@ -203,27 +252,4 @@ public class Serializers { return kryo.newInstance(this.type); } } - - /** - * Slow serialization approach for Avro schemas. - * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types. - * Having this serializer, we are able to handle avro Records. - */ - public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void write(Kryo kryo, Output output, Schema object) { - String schemaAsString = object.toString(false); - output.writeString(schemaAsString); - } - - @Override - public Schema read(Kryo kryo, Input input, Class<Schema> type) { - String schemaAsString = input.readString(); - // the parser seems to be stateful, to we need a new one for every type. - Schema.Parser sParser = new Schema.Parser(); - return sParser.parse(schemaAsString); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java deleted file mode 100644 index 5b08e52..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class AvroGenericArraySerializerTest extends AbstractGenericArraySerializerTest { - @Override - protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) { - return new AvroSerializer<T>(type); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java deleted file mode 100644 index 19fac43..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class AvroGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest { - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new AvroSerializer<T>(type); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java deleted file mode 100644 index df1ff60..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import org.apache.flink.api.common.typeutils.TypeSerializer; - -public class AvroGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { - - @Override - protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new AvroSerializer<T>(type); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java deleted file mode 100644 index 8a89410..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils.runtime; - -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.avro.reflect.Nullable; -import org.apache.flink.api.common.typeutils.SerializerTestInstance; -import org.junit.Test; - -public class AvroSerializerEmptyArrayTest { - - @Test - public void testBookSerialization() { - try { - Book b = new Book(123, "This is a test book", 26382648); - AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class); - SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b); - test.testAll(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSerialization() { - try { - List<String> titles = new ArrayList<String>(); - - List<Book> books = new ArrayList<Book>(); - books.add(new Book(123, "This is a test book", 1)); - books.add(new Book(24234234, "This is a test book", 1)); - books.add(new Book(1234324, "This is a test book", 3)); - - BookAuthor a = new BookAuthor(1, titles, "Test Author"); - a.books = books; - a.bookType = BookAuthor.BookType.journal; - - AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class); - - SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a); - test.testAll(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - public static class Book { - - long bookId; - @Nullable - String title; - long authorId; - - public Book() {} - - public Book(long bookId, String title, long authorId) { - this.bookId = bookId; - this.title = title; - this.authorId = authorId; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (authorId ^ (authorId >>> 32)); - result = prime * result + (int) (bookId ^ (bookId >>> 32)); - result = prime * result + ((title == null) ? 0 : title.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Book other = (Book) obj; - if (authorId != other.authorId) - return false; - if (bookId != other.bookId) - return false; - if (title == null) { - if (other.title != null) - return false; - } else if (!title.equals(other.title)) - return false; - return true; - } - } - - public static class BookAuthor { - - enum BookType { - book, - article, - journal - } - - long authorId; - - @Nullable - List<String> bookTitles; - - @Nullable - List<Book> books; - - String authorName; - - BookType bookType; - - public BookAuthor() {} - - public BookAuthor(long authorId, List<String> bookTitles, String authorName) { - this.authorId = authorId; - this.bookTitles = bookTitles; - this.authorName = authorName; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (authorId ^ (authorId >>> 32)); - result = prime * result + ((authorName == null) ? 0 : authorName.hashCode()); - result = prime * result + ((bookTitles == null) ? 0 : bookTitles.hashCode()); - result = prime * result + ((bookType == null) ? 0 : bookType.hashCode()); - result = prime * result + ((books == null) ? 0 : books.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - BookAuthor other = (BookAuthor) obj; - if (authorId != other.authorId) - return false; - if (authorName == null) { - if (other.authorName != null) - return false; - } else if (!authorName.equals(other.authorName)) - return false; - if (bookTitles == null) { - if (other.bookTitles != null) - return false; - } else if (!bookTitles.equals(other.bookTitles)) - return false; - if (bookType != other.bookType) - return false; - if (books == null) { - if (other.books != null) - return false; - } else if (!books.equals(other.books)) - return false; - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java index 5a404bd..1cacc9e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java @@ -18,20 +18,22 @@ package org.apache.flink.api.java.typeutils.runtime.kryo; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -42,6 +44,20 @@ import static org.junit.Assert.assertTrue; */ public class KryoSerializerCompatibilityTest { + @Test + public void testMigrationStrategyForRemovedAvroDependency() throws Exception { + KryoSerializer<TestClass> kryoSerializerForA = new KryoSerializer<>(TestClass.class, new ExecutionConfig()); + + // read configuration again from bytes + TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot; + try (InputStream in = getClass().getResourceAsStream("/kryo-serializer-flink1.3-snapshot")) { + kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + CompatibilityResult<TestClass> compatResult = kryoSerializerForA.ensureCompatibility(kryoSerializerConfigSnapshot); + assertFalse(compatResult.isRequiresMigration()); + } + /** * Verifies that reconfiguration result is INCOMPATIBLE if data type has changed. */ @@ -60,7 +76,7 @@ public class KryoSerializerCompatibilityTest { KryoSerializer<TestClassB> kryoSerializerForB = new KryoSerializer<>(TestClassB.class, new ExecutionConfig()); // read configuration again from bytes - try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } @@ -103,7 +119,7 @@ public class KryoSerializerCompatibilityTest { kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig); // read configuration from bytes - try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot b/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot new file mode 100644 index 0000000..0123a9c Binary files /dev/null and b/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot differ
