http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala index d916116..048e7ac 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala @@ -23,12 +23,11 @@ import java.util.StringTokenizer import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.twitter.TwitterSource import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData import org.apache.flink.util.Collector -import org.codehaus.jackson.JsonNode -import org.codehaus.jackson.map.ObjectMapper import scala.collection.mutable.ListBuffer
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/pom.xml ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml new file mode 100644 index 0000000..19d9129 --- /dev/null +++ b/flink-formats/flink-avro/pom.xml @@ -0,0 +1,280 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-formats</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-avro_${scala.binary.version}</artifactId> + <name>flink-avro</name> + + <packaging>jar</packaging> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.8.2</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>create-test-dependency</id> + <phase>process-test-classes</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.flink.formats.avro.testjar.AvroExternalJarProgram</mainClass> + </manifest> + </archive> + <finalName>maven</finalName> + <attach>false</attach> + <descriptors> + <descriptor>src/test/assembly/test-assembly.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the + classpath when running the tests to actually test whether the user code class loader + is properly used.--> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <version>2.5</version><!--$NO-MVN-MAN-VER$--> + <executions> + <execution> + <id>remove-avroexternalprogram</id> + <phase>process-test-classes</phase> + <goals> + <goal>clean</goal> + </goals> + <configuration> + <excludeDefaultDirectories>true</excludeDefaultDirectories> + <filesets> + <fileset> + <directory>${project.build.testOutputDirectory}</directory> + <includes> + <include>**/testjar/*.class</include> + </includes> + </fileset> + </filesets> + </configuration> + </execution> + </executions> + <configuration> + <filesets> + <fileset> + <directory>${project.basedir}/src/test/java/org/apache/flink/formats/avro/generated</directory> + </fileset> + </filesets> + </configuration> + </plugin> + <!-- Generate Test class from avro schema --> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>1.8.2</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> + <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Add Avro test classes to test jar in order to test AvroTypeInfo. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration combine.self="override"> + <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation> + <promoteTransitiveDependencies>true</promoteTransitiveDependencies> + <artifactSet> + <includes> + <include>org.codehaus.jackson:*</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>org.codehaus.jackson</pattern> + <shadedPattern>org.apache.flink.avro.shaded.org.codehouse.jackson</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + <pluginManagement> + <plugins> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <versionRange>[2.4,)</versionRange> + <goals> + <goal>single</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-clean-plugin</artifactId> + <versionRange>[1,)</versionRange> + <goals> + <goal>clean</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <versionRange>[1.7.7,)</versionRange> + <goals> + <goal>schema</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java new file mode 100644 index 0000000..58085f6 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.Public; + +import org.apache.avro.specific.SpecificRecordBase; + +import static org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema; + +/** + * @deprecated Please use <code>org.apache.flink.formats.avro.typeutils.AvroTypeInfo</code> + * in the <code>flink-avro</code> module. This class will be removed in the near future. + */ +@Deprecated +@Public +public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> { + + public AvroTypeInfo(Class<T> typeClass) { + super(typeClass, generateFieldsFromAvroSchema(typeClass)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java new file mode 100644 index 0000000..9b73ceb --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Provides a {@link FileInputFormat} for Avro records. + * + * @param <E> + * the type of the result Avro record. If you specify + * {@link GenericRecord} then the result will be returned as a + * {@link GenericRecord}, so you do not have to know the schema ahead + * of time. + */ +public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>, + CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class); + + private final Class<E> avroValueType; + + private boolean reuseAvroValue = true; + + private transient DataFileReader<E> dataFileReader; + + private transient long end; + + private transient long recordsReadSinceLastSync; + + private long lastSync = -1L; + + public AvroInputFormat(Path filePath, Class<E> type) { + super(filePath); + this.avroValueType = type; + } + + /** + * Sets the flag whether to reuse the Avro value instance for all records. + * By default, the input format reuses the Avro value. + * + * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise. + */ + public void setReuseAvroValue(boolean reuseAvroValue) { + this.reuseAvroValue = reuseAvroValue; + } + + /** + * If set, the InputFormat will only read entire files. + */ + public void setUnsplittable(boolean unsplittable) { + this.unsplittable = unsplittable; + } + + // -------------------------------------------------------------------------------------------- + // Typing + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation<E> getProducedType() { + return TypeExtractor.getForClass(this.avroValueType); + } + + // -------------------------------------------------------------------------------------------- + // Input Format Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + dataFileReader = initReader(split); + dataFileReader.sync(split.getStart()); + lastSync = dataFileReader.previousSync(); + } + + private DataFileReader<E> initReader(FileInputSplit split) throws IOException { + DatumReader<E> datumReader; + + if (org.apache.avro.generic.GenericRecord.class == avroValueType) { + datumReader = new GenericDatumReader<E>(); + } else { + datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType) + ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType); + } + if (LOG.isInfoEnabled()) { + LOG.info("Opening split {}", split); + } + + SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); + DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema()); + } + + end = split.getStart() + split.getLength(); + recordsReadSinceLastSync = 0; + return dataFileReader; + } + + @Override + public boolean reachedEnd() throws IOException { + return !dataFileReader.hasNext() || dataFileReader.pastSync(end); + } + + public long getRecordsReadFromBlock() { + return this.recordsReadSinceLastSync; + } + + @Override + public E nextRecord(E reuseValue) throws IOException { + if (reachedEnd()) { + return null; + } + + // if we start a new block, then register the event, and + // restart the counter. + if (dataFileReader.previousSync() != lastSync) { + lastSync = dataFileReader.previousSync(); + recordsReadSinceLastSync = 0; + } + recordsReadSinceLastSync++; + + if (reuseAvroValue) { + return dataFileReader.next(reuseValue); + } else { + if (GenericRecord.class == avroValueType) { + return dataFileReader.next(); + } else { + return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class)); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Checkpointing + // -------------------------------------------------------------------------------------------- + + @Override + public Tuple2<Long, Long> getCurrentState() throws IOException { + return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync); + } + + @Override + public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException { + Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); + Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); + + try { + this.open(split); + } finally { + if (state.f0 != -1) { + lastSync = state.f0; + recordsReadSinceLastSync = state.f1; + } + } + + if (lastSync != -1) { + // open and read until the record we were before + // the checkpoint and discard the values + dataFileReader.seek(lastSync); + for (int i = 0; i < recordsReadSinceLastSync; i++) { + dataFileReader.next(null); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java new file mode 100644 index 0000000..c0b8073 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.io.FileOutputFormat; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.fs.Path; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; + +import java.io.IOException; +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link FileOutputFormat} for Avro records. + * @param <E> + */ +public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable { + + /** + * Wrapper which encapsulates the supported codec and a related serialization byte. + */ + public enum Codec { + + NULL((byte) 0, CodecFactory.nullCodec()), + SNAPPY((byte) 1, CodecFactory.snappyCodec()), + BZIP2((byte) 2, CodecFactory.bzip2Codec()), + DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), + XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); + + private byte codecByte; + + private CodecFactory codecFactory; + + Codec(final byte codecByte, final CodecFactory codecFactory) { + this.codecByte = codecByte; + this.codecFactory = codecFactory; + } + + private byte getCodecByte() { + return codecByte; + } + + private CodecFactory getCodecFactory() { + return codecFactory; + } + + private static Codec forCodecByte(byte codecByte) { + for (final Codec codec : Codec.values()) { + if (codec.getCodecByte() == codecByte) { + return codec; + } + } + throw new IllegalArgumentException("no codec for codecByte: " + codecByte); + } + } + + private static final long serialVersionUID = 1L; + + private final Class<E> avroValueType; + + private transient Schema userDefinedSchema = null; + + private transient Codec codec = null; + + private transient DataFileWriter<E> dataFileWriter; + + public AvroOutputFormat(Path filePath, Class<E> type) { + super(filePath); + this.avroValueType = type; + } + + public AvroOutputFormat(Class<E> type) { + this.avroValueType = type; + } + + @Override + protected String getDirectoryFileName(int taskNumber) { + return super.getDirectoryFileName(taskNumber) + ".avro"; + } + + public void setSchema(Schema schema) { + this.userDefinedSchema = schema; + } + + /** + * Set avro codec for compression. + * + * @param codec avro codec. + */ + public void setCodec(final Codec codec) { + this.codec = checkNotNull(codec, "codec can not be null"); + } + + @Override + public void writeRecord(E record) throws IOException { + dataFileWriter.append(record); + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + super.open(taskNumber, numTasks); + + DatumWriter<E> datumWriter; + Schema schema; + if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { + datumWriter = new SpecificDatumWriter<E>(avroValueType); + try { + schema = ((org.apache.avro.specific.SpecificRecordBase) avroValueType.newInstance()).getSchema(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e.getMessage()); + } + } else if (org.apache.avro.generic.GenericRecord.class.isAssignableFrom(avroValueType)) { + if (userDefinedSchema == null) { + throw new IllegalStateException("Schema must be set when using Generic Record"); + } + datumWriter = new GenericDatumWriter<E>(userDefinedSchema); + schema = userDefinedSchema; + } else { + datumWriter = new ReflectDatumWriter<E>(avroValueType); + schema = ReflectData.get().getSchema(avroValueType); + } + dataFileWriter = new DataFileWriter<E>(datumWriter); + if (codec != null) { + dataFileWriter.setCodec(codec.getCodecFactory()); + } + if (userDefinedSchema == null) { + dataFileWriter.create(schema, stream); + } else { + dataFileWriter.create(userDefinedSchema, stream); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + + if (codec != null) { + out.writeByte(codec.getCodecByte()); + } else { + out.writeByte(-1); + } + + if (userDefinedSchema != null) { + byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET); + out.writeInt(json.length); + out.write(json); + } else { + out.writeInt(0); + } + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + byte codecByte = in.readByte(); + if (codecByte >= 0) { + setCodec(Codec.forCodecByte(codecByte)); + } + + int length = in.readInt(); + if (length != 0) { + byte[] json = new byte[length]; + in.readFully(json); + + Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET)); + setSchema(schema); + } + } + + @Override + public void close() throws IOException { + dataFileWriter.flush(); + dataFileWriter.close(); + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java new file mode 100644 index 0000000..4a3c02e --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Avro record class. + */ + private Class<? extends SpecificRecord> recordClazz; + + /** + * Schema for deterministic field order. + */ + private transient Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private transient DatumReader<SpecificRecord> datumReader; + + /** + * Input stream to read message from. + */ + private transient MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data. + */ + private transient Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private SpecificRecord record; + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new SpecificDatumReader<>(schema); + this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.writeObject(recordClazz); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject(); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new SpecificDatumReader<>(schema); + this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + /** + * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. + * Avro's {@link Utf8} fields are converted into regular Java strings. + */ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final Row row = new Row(fields.size()); + final GenericRecord record = (GenericRecord) recordObj; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + row.setField(i, convertToRow(field.schema(), record.get(field.pos()))); + } + return row; + } else if (recordObj instanceof Utf8) { + return recordObj.toString(); + } else { + return recordObj; + } + } + + /** + * An extension of the ByteArrayInputStream that allows to change a buffer that should be + * read without creating a new ByteArrayInputStream instance. This allows to re-use the same + * InputStream instance, copying message to process, and creation of Decoder on every new message. + */ + private static final class MutableByteArrayInputStream extends ByteArrayInputStream { + + public MutableByteArrayInputStream() { + super(new byte[0]); + } + + /** + * Set buffer that can be read via the InputStream interface and reset the input stream. + * This has the same effect as creating a new ByteArrayInputStream with a new buffer. + * + * @param buf the new buffer to read. + */ + public void setBuffer(byte[] buf) { + this.buf = buf; + this.pos = 0; + this.count = buf.length; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java new file mode 100644 index 0000000..41000a6 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro record class. + */ + private Class<? extends SpecificRecord> recordClazz; + + /** + * Avro serialization schema. + */ + private transient Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private transient DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new SpecificDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.writeObject(recordClazz); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject(); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new SpecificDatumWriter<>(schema); + this.arrayOutputStream = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord}. + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) { + schema = types.get(0); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema); + final Row row = (Row) rowObj; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + record.put(field.pos(), convertToRecord(field.schema(), row.getField(i))); + } + return record; + } else if (rowObj instanceof String) { + return new Utf8((String) rowObj); + } else { + return rowObj; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java new file mode 100644 index 0000000..02f74f5 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoUtils; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.formats.avro.utils.DataInputDecoder; +import org.apache.flink.formats.avro.utils.DataOutputEncoder; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import com.esotericsoftware.kryo.Kryo; +import org.apache.avro.generic.GenericData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.util.Utf8; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and + * Kryo for deep object copies. We want to change this to Kryo-only. + * + * @param <T> The type serialized. + */ +@Internal +public final class AvroSerializer<T> extends TypeSerializer<T> { + + private static final long serialVersionUID = 1L; + + private final Class<T> type; + + private final Class<? extends T> typeToInstantiate; + + /** + * Map of class tag (using classname as tag) to their Kryo registration. + * + * <p>This map serves as a preview of the final registration result of + * the Kryo instance, taking into account registration overwrites. + */ + private LinkedHashMap<String, KryoRegistration> kryoRegistrations; + + private transient ReflectDatumWriter<T> writer; + private transient ReflectDatumReader<T> reader; + + private transient DataOutputEncoder encoder; + private transient DataInputDecoder decoder; + + private transient Kryo kryo; + + private transient T deepCopyInstance; + + // -------------------------------------------------------------------------------------------- + + public AvroSerializer(Class<T> type) { + this(type, type); + } + + public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) { + this.type = checkNotNull(type); + this.typeToInstantiate = checkNotNull(typeToInstantiate); + + InstantiationUtil.checkForInstantiation(typeToInstantiate); + + this.kryoRegistrations = buildKryoRegistrations(type); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public AvroSerializer<T> duplicate() { + return new AvroSerializer<T>(type, typeToInstantiate); + } + + @Override + public T createInstance() { + return InstantiationUtil.instantiate(this.typeToInstantiate); + } + + @Override + public T copy(T from) { + checkKryoInitialized(); + + return KryoUtils.copy(from, kryo, this); + } + + @Override + public T copy(T from, T reuse) { + checkKryoInitialized(); + + return KryoUtils.copy(from, reuse, kryo, this); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T value, DataOutputView target) throws IOException { + checkAvroInitialized(); + this.encoder.setOut(target); + this.writer.write(value, this.encoder); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + checkAvroInitialized(); + this.decoder.setIn(source); + return this.reader.read(null, this.decoder); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + checkAvroInitialized(); + this.decoder.setIn(source); + return this.reader.read(reuse, this.decoder); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + checkAvroInitialized(); + + if (this.deepCopyInstance == null) { + this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); + } + + this.decoder.setIn(source); + this.encoder.setOut(target); + + T tmp = this.reader.read(this.deepCopyInstance, this.decoder); + this.writer.write(tmp, this.encoder); + } + + private void checkAvroInitialized() { + if (this.reader == null) { + this.reader = new ReflectDatumReader<T>(type); + this.writer = new ReflectDatumWriter<T>(type); + this.encoder = new DataOutputEncoder(); + this.decoder = new DataInputDecoder(); + } + } + + private void checkKryoInitialized() { + if (this.kryo == null) { + this.kryo = new Kryo(); + + Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); + instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(instantiatorStrategy); + + kryo.setAsmEnabled(true); + + KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + } + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AvroSerializer) { + @SuppressWarnings("unchecked") + AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj; + + return avroSerializer.canEqual(this) && + type == avroSerializer.type && + typeToInstantiate == avroSerializer.typeToInstantiate; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof AvroSerializer; + } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public AvroSerializerConfigSnapshot<T> snapshotConfiguration() { + return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); + } + + @SuppressWarnings("unchecked") + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot; + + if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) { + // resolve Kryo registrations; currently, since the Kryo registrations in Avro + // are fixed, there shouldn't be a problem with the resolution here. + + LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations(); + oldRegistrations.putAll(kryoRegistrations); + + for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) { + if (reconfiguredRegistrationEntry.getValue().isDummy()) { + return CompatibilityResult.requiresMigration(); + } + } + + this.kryoRegistrations = oldRegistrations; + return CompatibilityResult.compatible(); + } + } + + // ends up here if the preceding serializer is not + // the ValueSerializer, or serialized data type has changed + return CompatibilityResult.requiresMigration(); + } + + /** + * {@link TypeSerializerConfigSnapshot} for Avro. + */ + public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> { + + private static final int VERSION = 1; + + private Class<? extends T> typeToInstantiate; + + public AvroSerializerConfigSnapshot() {} + + public AvroSerializerConfigSnapshot( + Class<T> baseType, + Class<? extends T> typeToInstantiate, + LinkedHashMap<String, KryoRegistration> kryoRegistrations) { + + super(baseType, kryoRegistrations); + this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + out.writeUTF(typeToInstantiate.getName()); + } + + @SuppressWarnings("unchecked") + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + String classname = in.readUTF(); + try { + typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find requested class " + classname + " in classpath.", e); + } + } + + @Override + public int getVersion() { + return VERSION; + } + + public Class<? extends T> getTypeToInstantiate() { + return typeToInstantiate; + } + } + + // -------------------------------------------------------------------------------------------- + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // kryoRegistrations may be null if this Avro serializer is deserialized from an old version + if (kryoRegistrations == null) { + this.kryoRegistrations = buildKryoRegistrations(type); + } + } + + private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) { + final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>(); + + // register Avro types. + registrations.put( + GenericData.Array.class.getName(), + new KryoRegistration( + GenericData.Array.class, + new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); + registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class)); + registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class)); + registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class)); + registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class)); + + // register the serialized data type + registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType)); + + return registrations; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java new file mode 100644 index 0000000..ddc89a8 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import org.apache.avro.specific.SpecificRecordBase; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs) + * + * <p>Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>} with a {@code GenericType<avro.Utf8>}. + * All other types used by Avro are standard Java types. + * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime. + * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here + * by generic type infos containing Utf8 classes (which are comparable), + * + * <p>This class is checked by the AvroPojoTest. + */ +public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> { + + public AvroTypeInfo(Class<T> typeClass) { + super(typeClass, generateFieldsFromAvroSchema(typeClass)); + } + + @Override + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + return super.createSerializer(config); + } + + @SuppressWarnings("unchecked") + @Internal + public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) { + PojoTypeExtractor pte = new PojoTypeExtractor(); + ArrayList<Type> typeHierarchy = new ArrayList<>(); + typeHierarchy.add(typeClass); + TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null); + + if (!(ti instanceof PojoTypeInfo)) { + throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); + } + PojoTypeInfo pti = (PojoTypeInfo) ti; + List<PojoField> newFields = new ArrayList<>(pti.getTotalFields()); + + for (int i = 0; i < pti.getArity(); i++) { + PojoField f = pti.getPojoFieldAt(i); + TypeInformation newType = f.getTypeInformation(); + // check if type is a CharSequence + if (newType instanceof GenericTypeInfo) { + if ((newType).getTypeClass().equals(CharSequence.class)) { + // replace the type by a org.apache.avro.util.Utf8 + newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); + } + } + PojoField newField = new PojoField(f.getField(), newType); + newFields.add(newField); + } + return newFields; + } + + private static class PojoTypeExtractor extends TypeExtractor { + private PojoTypeExtractor() { + super(); + } + + @Override + public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, + ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java new file mode 100644 index 0000000..7305f23 --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.utils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; + +import java.io.Serializable; + +/** + * Utilities for integrating Avro serializers in Kryo. + */ +public class AvroKryoSerializerUtils { + + public static void addAvroSerializers(ExecutionConfig reg, Class<?> type) { + // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type + // because Kryo is not able to serialize them properly, we use this serializer for them + reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class); + + // We register this serializer for users who want to use untyped Avro records (GenericData.Record). + // Kryo is able to serialize everything in there, except for the Schema. + // This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea. + // we add the serializer as a default serializer because Avro is using a private sub-type at runtime. + reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class); + } + + /** + * Slow serialization approach for Avro schemas. + * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types. + * Having this serializer, we are able to handle avro Records. + */ + public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void write(Kryo kryo, Output output, Schema object) { + String schemaAsString = object.toString(false); + output.writeString(schemaAsString); + } + + @Override + public Schema read(Kryo kryo, Input input, Class<Schema> type) { + String schemaAsString = input.readString(); + // the parser seems to be stateful, to we need a new one for every type. + Schema.Parser sParser = new Schema.Parser(); + return sParser.parse(schemaAsString); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java new file mode 100644 index 0000000..32032cc --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.utils; + +import org.apache.avro.io.Decoder; +import org.apache.avro.util.Utf8; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A {@link Decoder} that reads from a {@link DataInput}. + */ +public class DataInputDecoder extends Decoder { + + private final Utf8 stringDecoder = new Utf8(); + + private DataInput in; + + public void setIn(DataInput in) { + this.in = in; + } + + // -------------------------------------------------------------------------------------------- + // primitives + // -------------------------------------------------------------------------------------------- + + @Override + public void readNull() {} + + @Override + public boolean readBoolean() throws IOException { + return in.readBoolean(); + } + + @Override + public int readInt() throws IOException { + return in.readInt(); + } + + @Override + public long readLong() throws IOException { + return in.readLong(); + } + + @Override + public float readFloat() throws IOException { + return in.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return in.readDouble(); + } + + @Override + public int readEnum() throws IOException { + return readInt(); + } + + // -------------------------------------------------------------------------------------------- + // bytes + // -------------------------------------------------------------------------------------------- + + @Override + public void readFixed(byte[] bytes, int start, int length) throws IOException { + in.readFully(bytes, start, length); + } + + @Override + public ByteBuffer readBytes(ByteBuffer old) throws IOException { + int length = readInt(); + ByteBuffer result; + if (old != null && length <= old.capacity() && old.hasArray()) { + result = old; + result.clear(); + } else { + result = ByteBuffer.allocate(length); + } + in.readFully(result.array(), result.arrayOffset() + result.position(), length); + result.limit(length); + return result; + } + + @Override + public void skipFixed(int length) throws IOException { + skipBytes(length); + } + + @Override + public void skipBytes() throws IOException { + int num = readInt(); + skipBytes(num); + } + + // -------------------------------------------------------------------------------------------- + // strings + // -------------------------------------------------------------------------------------------- + + @Override + public Utf8 readString(Utf8 old) throws IOException { + int length = readInt(); + Utf8 result = (old != null ? old : new Utf8()); + result.setByteLength(length); + + if (length > 0) { + in.readFully(result.getBytes(), 0, length); + } + + return result; + } + + @Override + public String readString() throws IOException { + return readString(stringDecoder).toString(); + } + + @Override + public void skipString() throws IOException { + int len = readInt(); + skipBytes(len); + } + + // -------------------------------------------------------------------------------------------- + // collection types + // -------------------------------------------------------------------------------------------- + + @Override + public long readArrayStart() throws IOException { + return readVarLongCount(in); + } + + @Override + public long arrayNext() throws IOException { + return readVarLongCount(in); + } + + @Override + public long skipArray() throws IOException { + return readVarLongCount(in); + } + + @Override + public long readMapStart() throws IOException { + return readVarLongCount(in); + } + + @Override + public long mapNext() throws IOException { + return readVarLongCount(in); + } + + @Override + public long skipMap() throws IOException { + return readVarLongCount(in); + } + + // -------------------------------------------------------------------------------------------- + // union + // -------------------------------------------------------------------------------------------- + + @Override + public int readIndex() throws IOException { + return readInt(); + } + + // -------------------------------------------------------------------------------------------- + // utils + // -------------------------------------------------------------------------------------------- + + private void skipBytes(int num) throws IOException { + while (num > 0) { + num -= in.skipBytes(num); + } + } + + public static long readVarLongCount(DataInput in) throws IOException { + long value = in.readUnsignedByte(); + + if ((value & 0x80) == 0) { + return value; + } + else { + long curr; + int shift = 7; + value = value & 0x7f; + while (((curr = in.readUnsignedByte()) & 0x80) != 0){ + value |= (curr & 0x7f) << shift; + shift += 7; + } + value |= curr << shift; + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java new file mode 100644 index 0000000..c2d490b --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.utils; + +import org.apache.avro.io.Encoder; +import org.apache.avro.util.Utf8; + +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * An {@link Encoder} that writes data to a {@link DataOutput}. + */ +public final class DataOutputEncoder extends Encoder implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private DataOutput out; + + public void setOut(DataOutput out) { + this.out = out; + } + + @Override + public void flush() throws IOException {} + + // -------------------------------------------------------------------------------------------- + // primitives + // -------------------------------------------------------------------------------------------- + + @Override + public void writeNull() {} + + @Override + public void writeBoolean(boolean b) throws IOException { + out.writeBoolean(b); + } + + @Override + public void writeInt(int n) throws IOException { + out.writeInt(n); + } + + @Override + public void writeLong(long n) throws IOException { + out.writeLong(n); + } + + @Override + public void writeFloat(float f) throws IOException { + out.writeFloat(f); + } + + @Override + public void writeDouble(double d) throws IOException { + out.writeDouble(d); + } + + @Override + public void writeEnum(int e) throws IOException { + out.writeInt(e); + } + + // -------------------------------------------------------------------------------------------- + // bytes + // -------------------------------------------------------------------------------------------- + + @Override + public void writeFixed(byte[] bytes, int start, int len) throws IOException { + out.write(bytes, start, len); + } + + @Override + public void writeBytes(byte[] bytes, int start, int len) throws IOException { + out.writeInt(len); + if (len > 0) { + out.write(bytes, start, len); + } + } + + @Override + public void writeBytes(ByteBuffer bytes) throws IOException { + int num = bytes.remaining(); + out.writeInt(num); + + if (num > 0) { + writeFixed(bytes); + } + } + + // -------------------------------------------------------------------------------------------- + // strings + // -------------------------------------------------------------------------------------------- + + @Override + public void writeString(String str) throws IOException { + byte[] bytes = Utf8.getBytesFor(str); + writeBytes(bytes, 0, bytes.length); + } + + @Override + public void writeString(Utf8 utf8) throws IOException { + writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); + + } + + // -------------------------------------------------------------------------------------------- + // collection types + // -------------------------------------------------------------------------------------------- + + @Override + public void writeArrayStart() {} + + @Override + public void setItemCount(long itemCount) throws IOException { + if (itemCount > 0) { + writeVarLongCount(out, itemCount); + } + } + + @Override + public void startItem() {} + + @Override + public void writeArrayEnd() throws IOException { + // write a single byte 0, shortcut for a var-length long of 0 + out.write(0); + } + + @Override + public void writeMapStart() {} + + @Override + public void writeMapEnd() throws IOException { + // write a single byte 0, shortcut for a var-length long of 0 + out.write(0); + } + + // -------------------------------------------------------------------------------------------- + // union + // -------------------------------------------------------------------------------------------- + + @Override + public void writeIndex(int unionIndex) throws IOException { + out.writeInt(unionIndex); + } + + // -------------------------------------------------------------------------------------------- + // utils + // -------------------------------------------------------------------------------------------- + + public static void writeVarLongCount(DataOutput out, long val) throws IOException { + if (val < 0) { + throw new IOException("Illegal count (must be non-negative): " + val); + } + + while ((val & ~0x7FL) != 0) { + out.write(((int) val) | 0x80); + val >>>= 7; + } + out.write((int) val); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java new file mode 100644 index 0000000..c00fecb --- /dev/null +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.utils; + +import org.apache.flink.core.fs.FSDataInputStream; + +import org.apache.avro.file.SeekableInput; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well). + * + * <p>The wrapper keeps track of the position in the data stream. + */ +public class FSDataInputStreamWrapper implements Closeable, SeekableInput { + private final FSDataInputStream stream; + private long pos; + private long len; + + public FSDataInputStreamWrapper(FSDataInputStream stream, long len) { + this.stream = stream; + this.pos = 0; + this.len = len; + } + + public long length() throws IOException { + return this.len; + } + + public int read(byte[] b, int off, int len) throws IOException { + int read; + read = stream.read(b, off, len); + pos += read; + return read; + } + + public void seek(long p) throws IOException { + stream.seek(p); + pos = p; + } + + public long tell() throws IOException { + return pos; + } + + public void close() throws IOException { + stream.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/assembly/test-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/assembly/test-assembly.xml b/flink-formats/flink-avro/src/test/assembly/test-assembly.xml new file mode 100644 index 0000000..8361693 --- /dev/null +++ b/flink-formats/flink-avro/src/test/assembly/test-assembly.xml @@ -0,0 +1,36 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<assembly> + <id>test-jar</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.build.testOutputDirectory}</directory> + <outputDirectory></outputDirectory> + <!--modify/add include to match your package(s) --> + <includes> + <include>org/apache/flink/formats/avro/testjar/**</include> + </includes> + </fileSet> + </fileSets> +</assembly> http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java new file mode 100644 index 0000000..985471a --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.test.util.TestEnvironment; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.net.URL; +import java.util.Collections; + +/** + * IT case for the {@link AvroExternalJarProgram}. + */ +public class AvroExternalJarProgramITCase extends TestLogger { + + private static final String JAR_FILE = "maven-test-jar.jar"; + + private static final String TEST_DATA_FILE = "/testdata.avro"; + + @Test + public void testExternalProgram() { + + LocalFlinkMiniCluster testMiniCluster = null; + + try { + int parallelism = 4; + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); + testMiniCluster = new LocalFlinkMiniCluster(config, false); + testMiniCluster.start(); + + String jarFile = JAR_FILE; + String testData = getClass().getResource(TEST_DATA_FILE).toString(); + + PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData }); + + TestEnvironment.setAsContext( + testMiniCluster, + parallelism, + Collections.singleton(new Path(jarFile)), + Collections.<URL>emptyList()); + + config.setString(JobManagerOptions.ADDRESS, "localhost"); + config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort()); + + program.invokeInteractiveModeForExecution(); + } + catch (Throwable t) { + System.err.println(t.getMessage()); + t.printStackTrace(); + Assert.fail("Error during the packaged program execution: " + t.getMessage()); + } + finally { + TestEnvironment.unsetAsContext(); + + if (testMiniCluster != null) { + try { + testMiniCluster.stop(); + } catch (Throwable t) { + // ignore + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java new file mode 100644 index 0000000..bc4f253 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.Path; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the type extraction of the {@link AvroInputFormat}. + */ +public class AvroInputFormatTypeExtractionTest { + + @Test + public void testTypeExtraction() { + try { + InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class); + + TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<MyAvroType> input = env.createInput(format); + TypeInformation<?> typeInfoDataSet = input.getType(); + + Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo); + Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo); + + Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass()); + Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + /** + * Test type. + */ + public static final class MyAvroType { + + public String theString; + + public MyAvroType recursive; + + private double aDouble; + + public double getaDouble() { + return aDouble; + } + + public void setaDouble(double aDouble) { + this.aDouble = aDouble; + } + + public void setTheString(String theString) { + this.theString = theString; + } + + public String getTheString() { + return theString; + } + } +}
