Integration tests for CRUNCH-204)
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d7570483 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d7570483 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d7570483 Branch: refs/heads/master Commit: d75704836d466bcaed0355782ad2bc559ffa2519 Parents: 2276ee0 Author: tzolov <[email protected]> Authored: Thu May 9 20:37:28 2013 +0200 Committer: tzolov <[email protected]> Committed: Thu May 9 20:37:28 2013 +0200 ---------------------------------------------------------------------- .../apache/crunch/io/avro/AvroMemPipelineIT.java | 118 +++++++++++++++ 1 files changed, 118 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d7570483/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java new file mode 100644 index 0000000..b997a52 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.crunch.io.avro; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.crunch.PCollection; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.io.To; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class AvroMemPipelineIT implements Serializable { + + private transient File avroFile; + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Before + public void setUp() throws IOException { + avroFile = tmpDir.getFile("test.avro"); + } + + @Test + public void testMemPipelienWithSpecificRecord() { + + List<CharSequence> siblingnames = Lists.newArrayList(); + Person writeRecord = new Person("John", 41, siblingnames); + + final PCollection<Person> writeCollection = MemPipeline.collectionOf(Collections.singleton(writeRecord)); + + writeCollection.write(To.avroFile(avroFile.getAbsolutePath())); + + PCollection<Person> readCollection = MemPipeline.getInstance().read( + At.avroFile(avroFile.getAbsolutePath(), Avros.records(Person.class))); + + Person readRecord = readCollection.materialize().iterator().next(); + + assertEquals(writeRecord, readRecord); + } + + @Test + public void testMemPipelienWithGenericRecord() { + + GenericRecord writeRecord = createGenericRecord(); + + final PCollection<GenericRecord> persons = MemPipeline.collectionOf(Collections.singleton(writeRecord)); + + persons.write(To.avroFile(avroFile.getAbsolutePath())); + + PCollection<Record> readCollection = MemPipeline.getInstance().read( + At.avroFile(avroFile.getAbsolutePath(), Avros.generics(writeRecord.getSchema()))); + + Record readRecord = readCollection.materialize().iterator().next(); + + assertEquals(writeRecord, readRecord); + } + + private GenericRecord createGenericRecord() { + + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + + return savedRecord; + } + + @Test + public void testMemPipelienWithReflectionRecord() { + + String writeRecord = "John Doe"; + + final PCollection<String> persons = MemPipeline.collectionOf(Collections.singleton(writeRecord)); + + persons.write(To.avroFile(avroFile.getAbsolutePath())); + + PCollection<? extends String> readCollection = MemPipeline.getInstance().read( + At.avroFile(avroFile.getAbsolutePath(), Avros.reflects(writeRecord.getClass()))); + + Object readRecord = readCollection.materialize().iterator().next(); + + assertEquals(writeRecord, readRecord.toString()); + } + +}
