http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java new file mode 100644 index 0000000..73067c1 --- /dev/null +++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.io; + +import java.io.IOException; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.api.avro.FSDataInputStreamWrapper; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.InstantiationUtil; + +/** + * Provides a {@link FileInputFormat} for Avro records. + * + * @param <E> + * the type of the result Avro record. If you specify + * {@link GenericRecord} then the result will be returned as a + * {@link GenericRecord}, so you do not have to know the schema ahead + * of time. + */ +public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>, + CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class); + + private final Class<E> avroValueType; + + private boolean reuseAvroValue = true; + + private transient DataFileReader<E> dataFileReader; + + private transient long end; + + private transient long recordsReadSinceLastSync; + + private long lastSync = -1l; + + public AvroInputFormat(Path filePath, Class<E> type) { + super(filePath); + this.avroValueType = type; + } + + /** + * Sets the flag whether to reuse the Avro value instance for all records. + * By default, the input format reuses the Avro value. + * + * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise. + */ + public void setReuseAvroValue(boolean reuseAvroValue) { + this.reuseAvroValue = reuseAvroValue; + } + + /** + * If set, the InputFormat will only read entire files. + */ + public void setUnsplittable(boolean unsplittable) { + this.unsplittable = unsplittable; + } + + // -------------------------------------------------------------------------------------------- + // Typing + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation<E> getProducedType() { + return TypeExtractor.getForClass(this.avroValueType); + } + + // -------------------------------------------------------------------------------------------- + // Input Format Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + dataFileReader = initReader(split); + dataFileReader.sync(split.getStart()); + lastSync = dataFileReader.previousSync(); + } + + private DataFileReader<E> initReader(FileInputSplit split) throws IOException { + DatumReader<E> datumReader; + + if (org.apache.avro.generic.GenericRecord.class == avroValueType) { + datumReader = new GenericDatumReader<E>(); + } else { + datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType) + ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType); + } + if (LOG.isInfoEnabled()) { + LOG.info("Opening split {}", split); + } + + SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); + DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader); + + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema()); + } + + end = split.getStart() + split.getLength(); + recordsReadSinceLastSync = 0; + return dataFileReader; + } + + @Override + public boolean reachedEnd() throws IOException { + return !dataFileReader.hasNext() || dataFileReader.pastSync(end); + } + + public long getRecordsReadFromBlock() { + return this.recordsReadSinceLastSync; + } + + @Override + public E nextRecord(E reuseValue) throws IOException { + if (reachedEnd()) { + return null; + } + + // if we start a new block, then register the event, and + // restart the counter. + if(dataFileReader.previousSync() != lastSync) { + lastSync = dataFileReader.previousSync(); + recordsReadSinceLastSync = 0; + } + recordsReadSinceLastSync++; + + if (reuseAvroValue) { + return dataFileReader.next(reuseValue); + } else { + if (GenericRecord.class == avroValueType) { + return dataFileReader.next(); + } else { + return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class)); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Checkpointing + // -------------------------------------------------------------------------------------------- + + @Override + public Tuple2<Long, Long> getCurrentState() throws IOException { + return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync); + } + + @Override + public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException { + Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); + Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); + + try { + this.open(split); + } finally { + if (state.f0 != -1) { + lastSync = state.f0; + recordsReadSinceLastSync = state.f1; + } + } + + if (lastSync != -1) { + // open and read until the record we were before + // the checkpoint and discard the values + dataFileReader.seek(lastSync); + for(int i = 0; i < recordsReadSinceLastSync; i++) { + dataFileReader.next(null); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java new file mode 100644 index 0000000..600d1e5 --- /dev/null +++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.flink.api.common.io.FileOutputFormat; +import org.apache.flink.core.fs.Path; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.io.IOException; +import java.io.Serializable; + +public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable { + + /** + * Wrapper which encapsulates the supported codec and a related serialization byte. + */ + public enum Codec { + + NULL((byte)0, CodecFactory.nullCodec()), + SNAPPY((byte)1, CodecFactory.snappyCodec()), + BZIP2((byte)2, CodecFactory.bzip2Codec()), + DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), + XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); + + private byte codecByte; + + private CodecFactory codecFactory; + + Codec(final byte codecByte, final CodecFactory codecFactory) { + this.codecByte = codecByte; + this.codecFactory = codecFactory; + } + + private byte getCodecByte() { + return codecByte; + } + + private CodecFactory getCodecFactory() { + return codecFactory; + } + + private static Codec forCodecByte(byte codecByte) { + for (final Codec codec : Codec.values()) { + if (codec.getCodecByte() == codecByte) { + return codec; + } + } + throw new IllegalArgumentException("no codec for codecByte: " + codecByte); + } + } + + private static final long serialVersionUID = 1L; + + private final Class<E> avroValueType; + + private transient Schema userDefinedSchema = null; + + private transient Codec codec = null; + + private transient DataFileWriter<E> dataFileWriter; + + public AvroOutputFormat(Path filePath, Class<E> type) { + super(filePath); + this.avroValueType = type; + } + + public AvroOutputFormat(Class<E> type) { + this.avroValueType = type; + } + + @Override + protected String getDirectoryFileName(int taskNumber) { + return super.getDirectoryFileName(taskNumber) + ".avro"; + } + + public void setSchema(Schema schema) { + this.userDefinedSchema = schema; + } + + /** + * Set avro codec for compression. + * + * @param codec avro codec. + */ + public void setCodec(final Codec codec) { + this.codec = checkNotNull(codec, "codec can not be null"); + } + + @Override + public void writeRecord(E record) throws IOException { + dataFileWriter.append(record); + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + super.open(taskNumber, numTasks); + + DatumWriter<E> datumWriter; + Schema schema; + if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { + datumWriter = new SpecificDatumWriter<E>(avroValueType); + try { + schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema(); + } catch (InstantiationException e) { + throw new RuntimeException(e.getMessage()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e.getMessage()); + } + } else { + datumWriter = new ReflectDatumWriter<E>(avroValueType); + schema = ReflectData.get().getSchema(avroValueType); + } + dataFileWriter = new DataFileWriter<E>(datumWriter); + if (codec != null) { + dataFileWriter.setCodec(codec.getCodecFactory()); + } + if (userDefinedSchema == null) { + dataFileWriter.create(schema, stream); + } else { + dataFileWriter.create(userDefinedSchema, stream); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + + if (codec != null) { + out.writeByte(codec.getCodecByte()); + } else { + out.writeByte(-1); + } + + if(userDefinedSchema != null) { + byte[] json = userDefinedSchema.toString().getBytes(); + out.writeInt(json.length); + out.write(json); + } else { + out.writeInt(0); + } + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + byte codecByte = in.readByte(); + if (codecByte >= 0) { + setCodec(Codec.forCodecByte(codecByte)); + } + + int length = in.readInt(); + if(length != 0) { + byte[] json = new byte[length]; + in.readFully(json); + + Schema schema = new Schema.Parser().parse(new String(json)); + setSchema(schema); + } + } + + @Override + public void close() throws IOException { + dataFileWriter.flush(); + dataFileWriter.close(); + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml new file mode 100644 index 0000000..0f4561a --- /dev/null +++ b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml @@ -0,0 +1,36 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<assembly> + <id>test-jar</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.build.testOutputDirectory}</directory> + <outputDirectory>/</outputDirectory> + <!--modify/add include to match your package(s) --> + <includes> + <include>org/apache/flink/api/avro/testjar/**</include> + </includes> + </fileSet> + </fileSets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java new file mode 100644 index 0000000..1030ff8 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import java.io.File; + +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; + +import org.junit.Assert; +import org.junit.Test; + +public class AvroExternalJarProgramITCase { + + private static final String JAR_FILE = "maven-test-jar.jar"; + + private static final String TEST_DATA_FILE = "/testdata.avro"; + + @Test + public void testExternalProgram() { + + LocalFlinkMiniCluster testMiniCluster = null; + + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); + testMiniCluster = new LocalFlinkMiniCluster(config, false); + testMiniCluster.start(); + + String jarFile = JAR_FILE; + String testData = getClass().getResource(TEST_DATA_FILE).toString(); + + PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData }); + + + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort()); + + ClusterClient client = new StandaloneClusterClient(config); + + client.setPrintStatusDuringExecution(false); + client.run(program, 4); + + } + catch (Throwable t) { + System.err.println(t.getMessage()); + t.printStackTrace(); + Assert.fail("Error during the packaged program execution: " + t.getMessage()); + } + finally { + if (testMiniCluster != null) { + try { + testMiniCluster.stop(); + } catch (Throwable t) { + // ignore + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java new file mode 100644 index 0000000..3b01ccb --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import org.junit.Assert; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.io.DatumReader; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.flink.api.io.avro.example.User; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.util.JavaProgramTestBase; + +@SuppressWarnings("serial") +public class AvroOutputFormatITCase extends JavaProgramTestBase { + + public static String outputPath1; + + public static String outputPath2; + + public static String inputPath; + + public static String userData = "alice|1|blue\n" + + "bob|2|red\n" + + "john|3|yellow\n" + + "walt|4|black\n"; + + @Override + protected void preSubmit() throws Exception { + inputPath = createTempFile("user", userData); + outputPath1 = getTempDirPath("avro_output1"); + outputPath2 = getTempDirPath("avro_output2"); + } + + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) + .fieldDelimiter("|") + .types(String.class, Integer.class, String.class); + + //output the data with AvroOutputFormat for specific user type + DataSet<User> specificUser = input.map(new ConvertToUser()); + AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class); + avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec + avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema + specificUser.write(avroOutputFormat, outputPath1); + + //output the data with AvroOutputFormat for reflect user type + DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective()); + reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2); + + env.execute(); + } + + @Override + protected void postSubmit() throws Exception { + //compare result for specific user type + File [] output1; + File file1 = asFile(outputPath1); + if (file1.isDirectory()) { + output1 = file1.listFiles(); + // check for avro ext in dir. + for (File avroOutput : output1) { + Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); + } + } else { + output1 = new File[] {file1}; + } + List<String> result1 = new ArrayList<String>(); + DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); + for (File avroOutput : output1) { + + DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); + while (dataFileReader1.hasNext()) { + User user = dataFileReader1.next(); + result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); + } + } + for (String expectedResult : userData.split("\n")) { + Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult)); + } + + //compare result for reflect user type + File [] output2; + File file2 = asFile(outputPath2); + if (file2.isDirectory()) { + output2 = file2.listFiles(); + } else { + output2 = new File[] {file2}; + } + List<String> result2 = new ArrayList<String>(); + DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class); + for (File avroOutput : output2) { + DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2); + while (dataFileReader2.hasNext()) { + ReflectiveUser user = dataFileReader2.next(); + result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); + } + } + for (String expectedResult : userData.split("\n")) { + Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult)); + } + + + } + + + public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { + + @Override + public User map(Tuple3<String, Integer, String> value) throws Exception { + return new User(value.f0, value.f1, value.f2); + } + } + + public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { + + @Override + public ReflectiveUser map(User value) throws Exception { + return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString()); + } + } + + + public static class ReflectiveUser { + private String name; + private int favoriteNumber; + private String favoriteColor; + + public ReflectiveUser() {} + + public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) { + this.name = name; + this.favoriteNumber = favoriteNumber; + this.favoriteColor = favoriteColor; + } + + public String getName() { + return this.name; + } + public String getFavoriteColor() { + return this.favoriteColor; + } + public int getFavoriteNumber() { + return this.favoriteNumber; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java new file mode 100644 index 0000000..c39db15 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.flink.api.io.avro.generated.Address; +import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.Fixed16; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.util.StringUtils; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization. + */ +public class EncoderDecoderTest { + @Test + public void testComplexStringsDirecty() { + try { + Random rnd = new Random(349712539451944123L); + + for (int i = 0; i < 10; i++) { + String testString = StringUtils.getRandomString(rnd, 10, 100); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + DataOutputEncoder encoder = new DataOutputEncoder(); + encoder.setOut(dataOut); + + encoder.writeString(testString); + dataOut.flush(); + dataOut.close(); + } + + byte[] data = baos.toByteArray(); + + // deserialize + { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream dataIn = new DataInputStream(bais); + DataInputDecoder decoder = new DataInputDecoder(); + decoder.setIn(dataIn); + + String deserialized = decoder.readString(); + + assertEquals(testString, deserialized); + } + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + @Test + public void testPrimitiveTypes() { + + testObjectSerialization(new Boolean(true)); + testObjectSerialization(new Boolean(false)); + + testObjectSerialization(Byte.valueOf((byte) 0)); + testObjectSerialization(Byte.valueOf((byte) 1)); + testObjectSerialization(Byte.valueOf((byte) -1)); + testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE)); + testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE)); + + testObjectSerialization(Short.valueOf((short) 0)); + testObjectSerialization(Short.valueOf((short) 1)); + testObjectSerialization(Short.valueOf((short) -1)); + testObjectSerialization(Short.valueOf(Short.MIN_VALUE)); + testObjectSerialization(Short.valueOf(Short.MAX_VALUE)); + + testObjectSerialization(Integer.valueOf(0)); + testObjectSerialization(Integer.valueOf(1)); + testObjectSerialization(Integer.valueOf(-1)); + testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE)); + testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE)); + + testObjectSerialization(Long.valueOf(0)); + testObjectSerialization(Long.valueOf(1)); + testObjectSerialization(Long.valueOf(-1)); + testObjectSerialization(Long.valueOf(Long.MIN_VALUE)); + testObjectSerialization(Long.valueOf(Long.MAX_VALUE)); + + testObjectSerialization(Float.valueOf(0)); + testObjectSerialization(Float.valueOf(1)); + testObjectSerialization(Float.valueOf(-1)); + testObjectSerialization(Float.valueOf((float)Math.E)); + testObjectSerialization(Float.valueOf((float)Math.PI)); + testObjectSerialization(Float.valueOf(Float.MIN_VALUE)); + testObjectSerialization(Float.valueOf(Float.MAX_VALUE)); + testObjectSerialization(Float.valueOf(Float.MIN_NORMAL)); + testObjectSerialization(Float.valueOf(Float.NaN)); + testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY)); + testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY)); + + testObjectSerialization(Double.valueOf(0)); + testObjectSerialization(Double.valueOf(1)); + testObjectSerialization(Double.valueOf(-1)); + testObjectSerialization(Double.valueOf(Math.E)); + testObjectSerialization(Double.valueOf(Math.PI)); + testObjectSerialization(Double.valueOf(Double.MIN_VALUE)); + testObjectSerialization(Double.valueOf(Double.MAX_VALUE)); + testObjectSerialization(Double.valueOf(Double.MIN_NORMAL)); + testObjectSerialization(Double.valueOf(Double.NaN)); + testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY)); + testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY)); + + testObjectSerialization(""); + testObjectSerialization("abcdefg"); + testObjectSerialization("ab\u1535\u0155xyz\u706F"); + + testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523)); + testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001)); + } + + @Test + public void testArrayTypes() { + { + int[] array = new int[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + long[] array = new long[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + float[] array = new float[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + double[] array = new double[] {1, 2, 3, 4, 5}; + testObjectSerialization(array); + } + { + String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"}; + testObjectSerialization(array); + } + } + + @Test + public void testEmptyArray() { + { + int[] array = new int[0]; + testObjectSerialization(array); + } + { + long[] array = new long[0]; + testObjectSerialization(array); + } + { + float[] array = new float[0]; + testObjectSerialization(array); + } + { + double[] array = new double[0]; + testObjectSerialization(array); + } + { + String[] array = new String[0]; + testObjectSerialization(array); + } + } + + @Test + public void testObjects() { + // simple object containing only primitives + { + testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42)); + } + + // object with collection + { + ArrayList<String> list = new ArrayList<String>(); + list.add("A"); + list.add("B"); + list.add("C"); + list.add("D"); + list.add("E"); + + testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym")); + } + + // object with empty collection + { + ArrayList<String> list = new ArrayList<String>(); + testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus")); + } + } + + @Test + public void testNestedObjectsWithCollections() { + testObjectSerialization(new ComplexNestedObject2(true)); + } + + @Test + public void testGeneratedObjectWithNullableFields() { + List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" }); + List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true); + Map<CharSequence, Long> map = new HashMap<CharSequence, Long>(); + map.put("1", 1L); + map.put("2", 2L); + map.put("3", 3L); + + byte[] b = new byte[16]; + new Random().nextBytes(b); + Fixed16 f = new Fixed16(b); + Address addr = new Address(new Integer(239), "6th Main", "Bangalore", + "Karnataka", "560075"); + User user = new User("Freudenreich", 1337, "macintosh gray", + 1234567890L, 3.1415926, null, true, strings, bools, null, + Colors.GREEN, map, f, new Boolean(true), addr); + + testObjectSerialization(user); + } + + @Test + public void testVarLenCountEncoding() { + try { + long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL}; + + // write + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + + for (long val : values) { + DataOutputEncoder.writeVarLongCount(dataOut, val); + } + + dataOut.flush(); + dataOut.close(); + } + + // read + { + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dataIn = new DataInputStream(bais); + + for (long val : values) { + long read = DataInputDecoder.readVarLongCount(dataIn); + assertEquals("Wrong var-len encoded value read.", val, read); + } + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + private static <X> void testObjectSerialization(X obj) { + + try { + + // serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + { + DataOutputStream dataOut = new DataOutputStream(baos); + DataOutputEncoder encoder = new DataOutputEncoder(); + encoder.setOut(dataOut); + + @SuppressWarnings("unchecked") + Class<X> clazz = (Class<X>) obj.getClass(); + ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz); + + writer.write(obj, encoder); + dataOut.flush(); + dataOut.close(); + } + + byte[] data = baos.toByteArray(); + X result = null; + + // deserialize + { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream dataIn = new DataInputStream(bais); + DataInputDecoder decoder = new DataInputDecoder(); + decoder.setIn(dataIn); + + @SuppressWarnings("unchecked") + Class<X> clazz = (Class<X>) obj.getClass(); + ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz); + + // create a reuse object if possible, otherwise we have no reuse object + X reuse = null; + try { + @SuppressWarnings("unchecked") + X test = (X) obj.getClass().newInstance(); + reuse = test; + } catch (Throwable t) {} + + result = reader.read(reuse, decoder); + } + + // check + final String message = "Deserialized object is not the same as the original"; + + if (obj.getClass().isArray()) { + Class<?> clazz = obj.getClass(); + if (clazz == byte[].class) { + assertArrayEquals(message, (byte[]) obj, (byte[]) result); + } + else if (clazz == short[].class) { + assertArrayEquals(message, (short[]) obj, (short[]) result); + } + else if (clazz == int[].class) { + assertArrayEquals(message, (int[]) obj, (int[]) result); + } + else if (clazz == long[].class) { + assertArrayEquals(message, (long[]) obj, (long[]) result); + } + else if (clazz == char[].class) { + assertArrayEquals(message, (char[]) obj, (char[]) result); + } + else if (clazz == float[].class) { + assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f); + } + else if (clazz == double[].class) { + assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0); + } else { + assertArrayEquals(message, (Object[]) obj, (Object[]) result); + } + } else { + assertEquals(message, obj, result); + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test failed due to an exception: " + e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + // Test Objects + // -------------------------------------------------------------------------------------------- + + + public static final class SimpleTypes { + + private final int iVal; + private final long lVal; + private final byte bVal; + private final String sVal; + private final short rVal; + private final double dVal; + + + public SimpleTypes() { + this(0, 0, (byte) 0, "", (short) 0, 0); + } + + public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) { + this.iVal = iVal; + this.lVal = lVal; + this.bVal = bVal; + this.sVal = sVal; + this.rVal = rVal; + this.dVal = dVal; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == SimpleTypes.class) { + SimpleTypes other = (SimpleTypes) obj; + + return other.iVal == this.iVal && + other.lVal == this.lVal && + other.bVal == this.bVal && + other.sVal.equals(this.sVal) && + other.rVal == this.rVal && + other.dVal == this.dVal; + + } else { + return false; + } + } + } + + public static class ComplexNestedObject1 { + + private double doubleValue; + + private List<String> stringList; + + public ComplexNestedObject1() {} + + public ComplexNestedObject1(int offInit) { + this.doubleValue = 6293485.6723 + offInit; + + this.stringList = new ArrayList<String>(); + this.stringList.add("A" + offInit); + this.stringList.add("somewhat" + offInit); + this.stringList.add("random" + offInit); + this.stringList.add("collection" + offInit); + this.stringList.add("of" + offInit); + this.stringList.add("strings" + offInit); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == ComplexNestedObject1.class) { + ComplexNestedObject1 other = (ComplexNestedObject1) obj; + return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList); + } else { + return false; + } + } + } + + public static class ComplexNestedObject2 { + + private long longValue; + + private Map<String, ComplexNestedObject1> theMap; + + public ComplexNestedObject2() {} + + public ComplexNestedObject2(boolean init) { + this.longValue = 46547; + + this.theMap = new HashMap<String, ComplexNestedObject1>(); + this.theMap.put("36354L", new ComplexNestedObject1(43546543)); + this.theMap.put("785611L", new ComplexNestedObject1(45784568)); + this.theMap.put("43L", new ComplexNestedObject1(9876543)); + this.theMap.put("-45687L", new ComplexNestedObject1(7897615)); + this.theMap.put("1919876876896L", new ComplexNestedObject1(27154)); + this.theMap.put("-868468468L", new ComplexNestedObject1(546435)); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == ComplexNestedObject2.class) { + ComplexNestedObject2 other = (ComplexNestedObject2) obj; + return other.longValue == this.longValue && this.theMap.equals(other.theMap); + } else { + return false; + } + } + } + + public static class Book { + + private long bookId; + private String title; + private long authorId; + + public Book() {} + + public Book(long bookId, String title, long authorId) { + this.bookId = bookId; + this.title = title; + this.authorId = authorId; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == Book.class) { + Book other = (Book) obj; + return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title); + } else { + return false; + } + } + } + + public static class BookAuthor { + + private long authorId; + private List<String> bookTitles; + private String authorName; + + public BookAuthor() {} + + public BookAuthor(long authorId, List<String> bookTitles, String authorName) { + this.authorId = authorId; + this.bookTitles = bookTitles; + this.authorName = authorName; + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == BookAuthor.class) { + BookAuthor other = (BookAuthor) obj; + return other.authorName.equals(this.authorName) && other.authorId == this.authorId && + other.bookTitles.equals(this.bookTitles); + } else { + return false; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java new file mode 100644 index 0000000..1174786 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.avro.testjar; + +// ================================================================================================ +// This file defines the classes for the AvroExternalJarProgramITCase. +// The program is exported into src/test/resources/AvroTestProgram.jar. +// +// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED +// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL +// NOT BE COVERED BY THIS TEST. +// ================================================================================================ + + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.core.fs.Path; + +public class AvroExternalJarProgram { + + public static final class Color { + + private String name; + private double saturation; + + public Color() { + name = ""; + saturation = 1.0; + } + + public Color(String name, double saturation) { + this.name = name; + this.saturation = saturation; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public double getSaturation() { + return saturation; + } + + public void setSaturation(double saturation) { + this.saturation = saturation; + } + + @Override + public String toString() { + return name + '(' + saturation + ')'; + } + } + + public static final class MyUser { + + private String name; + private List<Color> colors; + + public MyUser() { + name = "unknown"; + colors = new ArrayList<Color>(); + } + + public MyUser(String name, List<Color> colors) { + this.name = name; + this.colors = colors; + } + + public String getName() { + return name; + } + + public List<Color> getColors() { + return colors; + } + + public void setName(String name) { + this.name = name; + } + + public void setColors(List<Color> colors) { + this.colors = colors; + } + + @Override + public String toString() { + return name + " : " + colors; + } + } + + // -------------------------------------------------------------------------------------------- + + // -------------------------------------------------------------------------------------------- + + public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, MyUser> map(MyUser u) { + String namePrefix = u.getName().substring(0, 1); + return new Tuple2<String, MyUser>(namePrefix, u); + } + } + + public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) { + return val1; + } + } + + // -------------------------------------------------------------------------------------------- + // Test Data + // -------------------------------------------------------------------------------------------- + + public static final class Generator { + + private final Random rnd = new Random(2389756789345689276L); + + public MyUser nextUser() { + return randomUser(); + } + + private MyUser randomUser() { + + int numColors = rnd.nextInt(5); + ArrayList<Color> colors = new ArrayList<Color>(numColors); + for (int i = 0; i < numColors; i++) { + colors.add(new Color(randomString(), rnd.nextDouble())); + } + + return new MyUser(randomString(), colors); + } + + private String randomString() { + char[] c = new char[this.rnd.nextInt(20) + 5]; + + for (int i = 0; i < c.length; i++) { + c[i] = (char) (this.rnd.nextInt(150) + 40); + } + + return new String(c); + } + } + + public static void writeTestData(File testFile, int numRecords) throws IOException { + + DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class); + DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter); + + dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile); + + + Generator generator = new Generator(); + + for (int i = 0; i < numRecords; i++) { + MyUser user = generator.nextUser(); + dataFileWriter.append(user); + } + + dataFileWriter.close(); + } + +// public static void main(String[] args) throws Exception { +// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath(); +// writeTestData(new File(testDataFile), 50); +// } + + public static void main(String[] args) throws Exception { + String inputPath = args[0]; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class)); + + DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper()); + + result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>()); + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java new file mode 100644 index 0000000..f33f433 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.io.avro; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +@RunWith(Parameterized.class) +public class AvroPojoTest extends MultipleProgramsTestBase { + public AvroPojoTest(TestExecutionMode mode) { + super(mode); + } + + private File inFile; + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + inFile = tempFolder.newFile(); + AvroRecordInputFormatTest.writeTestFile(inFile); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSimpleAvroRead() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction<User, User>() { + @Override + public User map(User value) throws Exception { + value.setTypeMap(null); + return value; + } + }); + + usersDS.writeAsText(resultPath); + + env.execute("Simple Avro read job"); + + + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; + } + + @Test + public void testSerializeWithAvro() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceAvro(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction<User, User>() { + @Override + public User map(User value) throws Exception { + Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1); + ab.put("hehe", 12L); + value.setTypeMap(ab); + return value; + } + }); + + usersDS.writeAsText(resultPath); + + env.execute("Simple Avro read job"); + + + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; + + } + + @Test + public void testKeySelection() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + expected = "(Alyssa,1)\n(Charlie,1)\n"; + } + + @Test + public void testWithAvroGenericSer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceAvro(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { + @Override + public String getKey(User value) throws Exception { + return String.valueOf(value.getName()); + } + }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for(User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + expected = "(Charlie,1)\n(Alyssa,1)\n"; + } + + @Test + public void testWithKryoGenericSer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableForceKryo(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { + @Override + public String getKey(User value) throws Exception { + return String.valueOf(value.getName()); + } + }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { + @Override + public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); + } + } + }); + + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + expected = "(Charlie,1)\n(Alyssa,1)\n"; + } + + /** + * Test some know fields for grouping on + */ + @Test + public void testAllFields() throws Exception { + for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { + testField(fieldName); + } + } + + private void testField(final String fieldName) throws Exception { + before(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); + DataSet<User> usersDS = env.createInput(users); + + DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { + @Override + public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { + for(User u : values) { + out.collect(u.get(fieldName)); + } + } + }); + res.writeAsText(resultPath); + env.execute("Simple Avro read job"); + + // test if automatic registration of the Types worked + ExecutionConfig ec = env.getConfig(); + Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class)); + + if(fieldName.equals("name")) { + expected = "Alyssa\nCharlie"; + } else if(fieldName.equals("type_enum")) { + expected = "GREEN\nRED\n"; + } else if(fieldName.equals("type_double_test")) { + expected = "123.45\n1.337\n"; + } else { + Assert.fail("Unknown field"); + } + + after(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java new file mode 100644 index 0000000..91a9612 --- /dev/null +++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.io.avro; + +import static org.junit.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.*; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.io.avro.generated.Address; +import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the avro input format. + * (The testcase is mostly the getting started tutorial of avro) + * http://avro.apache.org/docs/current/gettingstartedjava.html + */ +public class AvroRecordInputFormatTest { + + public File testFile; + + final static String TEST_NAME = "Alyssa"; + + final static String TEST_ARRAY_STRING_1 = "ELEMENT 1"; + final static String TEST_ARRAY_STRING_2 = "ELEMENT 2"; + + final static boolean TEST_ARRAY_BOOLEAN_1 = true; + final static boolean TEST_ARRAY_BOOLEAN_2 = false; + + final static Colors TEST_ENUM_COLOR = Colors.GREEN; + + final static String TEST_MAP_KEY1 = "KEY 1"; + final static long TEST_MAP_VALUE1 = 8546456L; + final static String TEST_MAP_KEY2 = "KEY 2"; + final static long TEST_MAP_VALUE2 = 17554L; + + final static int TEST_NUM = 239; + final static String TEST_STREET = "Baker Street"; + final static String TEST_CITY = "London"; + final static String TEST_STATE = "London"; + final static String TEST_ZIP = "NW1 6XE"; + + + private Schema userSchema = new User().getSchema(); + + + public static void writeTestFile(File testFile) throws IOException { + ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); + stringArray.add(TEST_ARRAY_STRING_1); + stringArray.add(TEST_ARRAY_STRING_2); + + ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); + booleanArray.add(TEST_ARRAY_BOOLEAN_1); + booleanArray.add(TEST_ARRAY_BOOLEAN_2); + + HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); + longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); + longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); + + Address addr = new Address(); + addr.setNum(TEST_NUM); + addr.setStreet(TEST_STREET); + addr.setCity(TEST_CITY); + addr.setState(TEST_STATE); + addr.setZip(TEST_ZIP); + + + User user1 = new User(); + + user1.setName(TEST_NAME); + user1.setFavoriteNumber(256); + user1.setTypeDoubleTest(123.45d); + user1.setTypeBoolTest(true); + user1.setTypeArrayString(stringArray); + user1.setTypeArrayBoolean(booleanArray); + user1.setTypeEnum(TEST_ENUM_COLOR); + user1.setTypeMap(longMap); + user1.setTypeNested(addr); + + // Construct via builder + User user2 = User.newBuilder() + .setName("Charlie") + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .setTypeArrayString(new ArrayList<CharSequence>()) + .setTypeArrayBoolean(new ArrayList<Boolean>()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap<CharSequence, Long>()) + .setTypeFixed(null) + .setTypeUnion(null) + .setTypeNested( + Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) + .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) + .build()) + .build(); + DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); + DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); + dataFileWriter.create(user1.getSchema(), testFile); + dataFileWriter.append(user1); + dataFileWriter.append(user2); + dataFileWriter.close(); + } + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroInputFormatTest", null); + writeTestFile(testFile); + } + + + /** + * Test if the AvroInputFormat is able to properly read data from an avro file. + * @throws IOException + */ + @Test + public void testDeserialisation() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + User u = format.nextRecord(null); + assertNotNull(u); + + String name = u.getName().toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = u.getTypeArrayString(); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = u.getTypeArrayBoolean(); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + Colors enumValue = u.getTypeEnum(); + assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); + + // check maps + Map<CharSequence, Long> lm = u.getTypeMap(); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + + format.close(); + } + + /** + * Test if the AvroInputFormat is able to properly read data from an avro file. + * @throws IOException + */ + @Test + public void testDeserialisationReuseAvroRecordFalse() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); + format.setReuseAvroValue(false); + + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + User u = format.nextRecord(null); + assertNotNull(u); + + String name = u.getName().toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = u.getTypeArrayString(); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = u.getTypeArrayBoolean(); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + Colors enumValue = u.getTypeEnum(); + assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); + + // check maps + Map<CharSequence, Long> lm = u.getTypeMap(); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + + format.close(); + } + + /** + * Test if the Flink serialization is able to properly process GenericData.Record types. + * Usually users of Avro generate classes (POJOs) from Avro schemas. + * However, if generated classes are not available, one can also use GenericData.Record. + * It is an untyped key-value record which is using a schema to validate the correctness of the data. + * + * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead. + */ + @Test + public void testDeserializeToGenericType() throws IOException { + DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema); + + try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { + // initialize Record by reading it from disk (thats easier than creating it by hand) + GenericData.Record rec = new GenericData.Record(userSchema); + dataFileReader.next(rec); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + assertEquals(null, rec.get("type_long_test")); // it is null for the first record. + + // now serialize it with our framework: + TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class); + + ExecutionConfig ec = new ExecutionConfig(); + Assert.assertEquals(GenericTypeInfo.class, te.getClass()); + + Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>()); + + TypeSerializer<GenericData.Record> tser = te.createSerializer(ec); + Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); + Assert.assertTrue( + ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && + ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class)); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) { + tser.serialize(rec, outView); + } + + GenericData.Record newRec; + try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper( + new ByteArrayInputStream(out.toByteArray()))) + { + newRec = tser.deserialize(inView); + } + + // check if it is still the same + assertNotNull(newRec); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); + assertEquals("name not equal", TEST_NAME, newRec.get("name").toString()); + assertEquals(null, newRec.get("type_long_test")); + } + } + + /** + * This test validates proper serialization with specific (generated POJO) types. + */ + @Test + public void testDeserializeToSpecificType() throws IOException { + + DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema); + + try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) { + User rec = dataFileReader.next(); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + + // now serialize it with our framework: + ExecutionConfig ec = new ExecutionConfig(); + TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class); + + Assert.assertEquals(AvroTypeInfo.class, te.getClass()); + TypeSerializer<User> tser = te.createSerializer(ec); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) { + tser.serialize(rec, outView); + } + + User newRec; + try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper( + new ByteArrayInputStream(out.toByteArray()))) + { + newRec = tser.deserialize(inView); + } + + // check if it is still the same + assertNotNull(newRec); + assertEquals("name not equal", TEST_NAME, newRec.getName().toString()); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString()); + } + } + + /** + * Test if the AvroInputFormat is able to properly read data from an Avro + * file as a GenericRecord. + * + * @throws IOException, + * if there is an exception + */ + @Test + public void testDeserialisationGenericRecord() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + + doTestDeserializationGenericRecord(format, parameters); + } + + /** + * Helper method to test GenericRecord serialisation + * + * @param format + * the format to test + * @param parameters + * the configuration to use + * @throws IOException + * thrown id there is a issue + */ + @SuppressWarnings("unchecked") + private void doTestDeserializationGenericRecord(final AvroInputFormat<GenericRecord> format, + final Configuration parameters) throws IOException { + try { + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + GenericRecord u = format.nextRecord(null); + assertNotNull(u); + assertEquals("The schemas should be equal", userSchema, u.getSchema()); + + String name = u.get("name").toString(); + assertNotNull("empty record", name); + assertEquals("name not equal", TEST_NAME, name); + + // check arrays + List<CharSequence> sl = (List<CharSequence>) u.get("type_array_string"); + assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); + assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); + + List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean"); + assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); + assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); + + // check enums + GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) u.get("type_enum"); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), enumValue.toString()); + + // check maps + Map<CharSequence, Long> lm = (Map<CharSequence, Long>) u.get("type_map"); + assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); + assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); + + assertFalse("expecting second element", format.reachedEnd()); + assertNotNull("expecting second element", format.nextRecord(u)); + + assertNull(format.nextRecord(u)); + assertTrue(format.reachedEnd()); + } finally { + format.close(); + } + } + + /** + * Test if the AvroInputFormat is able to properly read data from an avro + * file as a GenericRecord + * + * @throws IOException, + * if there is an error + */ + @Test + public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + format.configure(parameters); + format.setReuseAvroValue(false); + + doTestDeserializationGenericRecord(format, parameters); + } + + @After + public void deleteFiles() { + testFile.delete(); + } + +}
