http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java new file mode 100644 index 0000000..5292353 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static junit.framework.Assert.assertEquals; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * Test {@link org.apache.crunch.types.avro.SafeAvroSerialization} with Specific Avro types + */ +public class SpecificAvroGroupByIT implements Serializable { + + private static final long serialVersionUID = 1344118240353796561L; + + private transient File avroFile; + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + + @Before + public void setUp() throws IOException { + avroFile = File.createTempFile("avrotest", ".avro"); + } + + @After + public void tearDown() { + avroFile.delete(); + } + + @Test + public void testGrouByWithSpecificAvroType() throws Exception { + MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration()); + testSpecificAvro(pipeline); + } + + public void testSpecificAvro(MRPipeline pipeline) throws Exception { + + createPersonAvroFile(avroFile); + + PCollection<Person> unsorted = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), Avros.records(Person.class))); + + PTable<String, Person> sorted = unsorted.parallelDo(new MapFn<Person, Pair<String, Person>>() { + + @Override + public Pair<String, Person> map(Person input) { + String key = input.name.toString(); + return Pair.of(key, input); + + } + }, Avros.tableOf(Avros.strings(), Avros.records(Person.class))).groupByKey().ungroup(); + + List<Pair<String, Person>> outputPersonList = Lists.newArrayList(sorted.materialize()); + + assertEquals(1, outputPersonList.size()); + assertEquals(String.class, outputPersonList.get(0).first().getClass()); + assertEquals(Person.class, outputPersonList.get(0).second().getClass()); + + pipeline.done(); + } + + private void createPersonAvroFile(File avroFile) throws IOException { + + Person person = new Person(); + person.age = 40; + person.name = "Bob"; + List<CharSequence> siblingNames = Lists.newArrayList(); + siblingNames.add("Bob" + "1"); + siblingNames.add("Bob" + "2"); + person.siblingnames = siblingNames; + + FileOutputStream outputStream = new FileOutputStream(avroFile); + SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(Person.class); + + DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(writer); + dataFileWriter.create(Person.SCHEMA$, outputStream); + dataFileWriter.append(person); + dataFileWriter.close(); + outputStream.close(); + } +}
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java new file mode 100644 index 0000000..63d594d --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +public class FullOuterJoinIT extends JoinTester { + @Override + public void assertPassed(Iterable<Pair<String, Long>> lines) { + boolean passed1 = false; + boolean passed2 = false; + boolean passed3 = false; + for (Pair<String, Long> line : lines) { + if ("wretched".equals(line.first()) && 24 == line.second()) { + passed1 = true; + } + if ("againe".equals(line.first()) && 10 == line.second()) { + passed2 = true; + } + if ("Montparnasse.".equals(line.first()) && 2 == line.second()) { + passed3 = true; + } + } + assertTrue(passed1); + assertTrue(passed2); + assertTrue(passed3); + } + + @Override + protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) { + return new FullOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java new file mode 100644 index 0000000..4759050 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +public class InnerJoinIT extends JoinTester { + @Override + public void assertPassed(Iterable<Pair<String, Long>> lines) { + boolean passed1 = false; + boolean passed2 = true; + boolean passed3 = true; + for (Pair<String, Long> line : lines) { + if ("wretched".equals(line.first()) && 24 == line.second()) { + passed1 = true; + } + if ("againe".equals(line.first())) { + passed2 = false; + } + if ("Montparnasse.".equals(line.first())) { + passed3 = false; + } + } + assertTrue(passed1); + assertTrue(passed2); + assertTrue(passed3); + } + + @Override + protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) { + return new InnerJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java new file mode 100644 index 0000000..3e8ffda --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/JoinTester.java @@ -0,0 +1,108 @@ +/** +R * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.lib.Join; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.Rule; +import org.junit.Test; + +public abstract class JoinTester implements Serializable { + private static class WordSplit extends DoFn<String, String> { + @Override + public void process(String input, Emitter<String> emitter) { + for (String word : input.split("\\s+")) { + emitter.emit(word); + } + } + } + + protected PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2, PTypeFamily ptf) { + PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs()); + PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings())); + PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings())); + + PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf)); + + PTable<String, Long> sums = join.parallelDo("cnt", new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() { + @Override + public void process(Pair<String, Pair<Long, Long>> input, Emitter<Pair<String, Long>> emitter) { + Pair<Long, Long> pair = input.second(); + long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second() : 0); + emitter.emit(Pair.of(input.first(), sum)); + } + }, ntt); + + return sums; + } + + protected void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { + String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt"); + String maughamInputPath = tmpDir.copyResourceFileName("maugham.txt"); + + PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath); + PCollection<String> maugham = pipeline.readTextFile(maughamInputPath); + PTable<String, Long> joined = join(shakespeare, maugham, typeFamily); + Iterable<Pair<String, Long>> lines = joined.materialize(); + + assertPassed(lines); + + pipeline.done(); + } + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testWritableJoin() throws Exception { + run(new MRPipeline(InnerJoinIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance()); + } + + @Test + public void testAvroJoin() throws Exception { + run(new MRPipeline(InnerJoinIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance()); + } + + /** + * Used to check that the result of the join makes sense. + * + * @param lines + * The result of the join. + */ + public abstract void assertPassed(Iterable<Pair<String, Long>> lines); + + /** + * @return The JoinFn to use. + */ + protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java new file mode 100644 index 0000000..4ad2a81 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +public class LeftOuterJoinIT extends JoinTester { + @Override + public void assertPassed(Iterable<Pair<String, Long>> lines) { + boolean passed1 = false; + boolean passed2 = false; + boolean passed3 = true; + for (Pair<String, Long> line : lines) { + if ("wretched".equals(line.first()) && 24 == line.second()) { + passed1 = true; + } + if ("againe".equals(line.first()) && 10 == line.second()) { + passed2 = true; + } + if ("Montparnasse.".equals(line.first())) { + passed3 = false; + } + } + assertTrue(passed1); + assertTrue(passed2); + assertTrue(passed3); + } + + @Override + protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) { + return new LeftOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java new file mode 100644 index 0000000..8bb5586 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.fn.FilterFns; +import org.apache.crunch.fn.MapValuesFn; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class MapsideJoinIT { + + private static String saveTempDir; + + @BeforeClass + public static void setUpClass(){ + + // Ensure a consistent temporary directory for use of the DistributedCache. + + // The DistributedCache technically isn't supported when running in local mode, and the default + // temporary directiory "/tmp" is used as its location. This typically only causes an issue when + // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary + // directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms. + saveTempDir = System.setProperty("java.io.tmpdir", "/tmp"); + } + + @AfterClass + public static void tearDownClass(){ + System.setProperty("java.io.tmpdir", saveTempDir); + } + + private static class LineSplitter extends MapFn<String, Pair<Integer, String>> { + @Override + public Pair<Integer, String> map(String input) { + String[] fields = input.split("\\|"); + return Pair.of(Integer.parseInt(fields[0]), fields[1]); + } + } + + private static class CapOrdersFn extends MapValuesFn<Integer, String, String> { + @Override + public String map(String v) { + return v.toUpperCase(); + } + } + + private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>, String> { + @Override + public String map(Pair<String, String> v) { + return v.toString(); + } + } + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testMapSideJoin_MemPipeline() { + runMapsideJoin(MemPipeline.getInstance(), true); + } + + @Test + public void testMapsideJoin_RightSideIsEmpty() throws IOException { + MRPipeline pipeline = new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()); + PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); + PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + + PTable<Integer, String> filteredOrderTable = orderTable + .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType()); + + PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, filteredOrderTable); + + List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize()); + + assertTrue(materializedJoin.isEmpty()); + } + + @Test + public void testMapsideJoin() throws IOException { + runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()), false); + } + + private void runMapsideJoin(Pipeline pipeline, boolean inMemory) { + PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); + PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + + PTable<Integer, String> custOrders = MapsideJoin.join(customerTable, orderTable) + .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings())); + + PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType()); + + PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(custOrders, ORDER_TABLE); + + List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList(); + expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES"))); + expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER"))); + expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER"))); + expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER"))); + expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER"))); + expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH"))); + Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize(); + + PipelineResult res = pipeline.run(); + if (!inMemory) { + assertEquals(2, res.getStageResults().size()); + } + + List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter); + Collections.sort(joinedResultList); + + assertEquals(expectedJoinResult, joinedResultList); + } + + private PTable<Integer, String> readTable(Pipeline pipeline, String filename) { + try { + return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable", new LineSplitter(), + Writables.tableOf(Writables.ints(), Writables.strings())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java new file mode 100644 index 0000000..f1ca770 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.apache.crunch.types.avro.Avros.records; +import static org.apache.crunch.types.avro.Avros.strings; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.Employee; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class MultiAvroSchemaJoinIT { + + private File personFile; + private File employeeFile; + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Before + public void setUp() throws Exception { + this.personFile = File.createTempFile("person", ".avro"); + this.employeeFile = File.createTempFile("employee", ".avro"); + + DatumWriter<Person> pdw = new SpecificDatumWriter<Person>(); + DataFileWriter<Person> pfw = new DataFileWriter<Person>(pdw); + pfw.create(Person.SCHEMA$, personFile); + Person p1 = new Person(); + p1.name = "Josh"; + p1.age = 19; + p1.siblingnames = ImmutableList.<CharSequence> of("Kate", "Mike"); + pfw.append(p1); + Person p2 = new Person(); + p2.name = "Kate"; + p2.age = 17;; + p2.siblingnames = ImmutableList.<CharSequence> of("Josh", "Mike"); + pfw.append(p2); + Person p3 = new Person(); + p3.name = "Mike"; + p3.age = 12; + p3.siblingnames = ImmutableList.<CharSequence> of("Josh", "Kate"); + pfw.append(p3); + pfw.close(); + + DatumWriter<Employee> edw = new SpecificDatumWriter<Employee>(); + DataFileWriter<Employee> efw = new DataFileWriter<Employee>(edw); + efw.create(Employee.SCHEMA$, employeeFile); + Employee e1 = new Employee(); + e1.name = "Kate"; + e1.salary = 100000; + e1.department = "Marketing"; + efw.append(e1); + efw.close(); + } + + @After + public void tearDown() throws Exception { + personFile.delete(); + employeeFile.delete(); + } + + public static class NameFn<K extends SpecificRecord> extends MapFn<K, String> { + @Override + public String map(K input) { + Schema s = input.getSchema(); + Schema.Field f = s.getField("name"); + return input.get(f.pos()).toString(); + } + } + + @Test + public void testJoin() throws Exception { + Pipeline p = new MRPipeline(MultiAvroSchemaJoinIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> people = p.read(From.avroFile(personFile.getAbsolutePath(), records(Person.class))); + PCollection<Employee> employees = p.read(From.avroFile(employeeFile.getAbsolutePath(), records(Employee.class))); + + Iterable<Pair<Person, Employee>> result = people.by(new NameFn<Person>(), strings()) + .join(employees.by(new NameFn<Employee>(), strings())).values().materialize(); + List<Pair<Person, Employee>> v = Lists.newArrayList(result); + assertEquals(1, v.size()); + assertEquals("Kate", v.get(0).first().name.toString()); + assertEquals("Kate", v.get(0).second().name.toString()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java new file mode 100644 index 0000000..d889b61 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertTrue; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTypeFamily; + +public class RightOuterJoinIT extends JoinTester { + @Override + public void assertPassed(Iterable<Pair<String, Long>> lines) { + boolean passed1 = false; + boolean passed2 = true; + boolean passed3 = false; + for (Pair<String, Long> line : lines) { + if ("wretched".equals(line.first()) && 24 == line.second()) { + passed1 = true; + } + if ("againe".equals(line.first())) { + passed2 = false; + } + if ("Montparnasse.".equals(line.first()) && 2 == line.second()) { + passed3 = true; + } + } + assertTrue(passed1); + assertTrue(passed2); + assertTrue(passed3); + } + + @Override + protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) { + return new RightOuterJoinFn<String, Long, Long>(typeFamily.strings(), typeFamily.longs()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/test/TemporaryPaths.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/test/TemporaryPaths.java b/crunch-core/src/it/java/org/apache/crunch/test/TemporaryPaths.java new file mode 100644 index 0000000..97cf0de --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/test/TemporaryPaths.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.test; + +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.hadoop.conf.Configuration; + + +/** + * Utilities for working with {@link TemporaryPath}. + */ +public final class TemporaryPaths { + + /** + * Static factory returning a {@link TemporaryPath} with adjusted + * {@link Configuration} properties. + */ + public static TemporaryPath create() { + return new TemporaryPath(RuntimeParameters.TMP_DIR, "hadoop.tmp.dir"); + } + + private TemporaryPaths() { + // nothing + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/test/Tests.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/test/Tests.java b/crunch-core/src/it/java/org/apache/crunch/test/Tests.java new file mode 100644 index 0000000..e381c1a --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/test/Tests.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.test; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.hadoop.io.Writable; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import com.google.common.io.Resources; + + +/** + * Utilities for integration tests. + */ +public final class Tests { + + private Tests() { + // nothing + } + + /** + * Get the path to and integration test resource file, as per naming convention. + * + * @param testCase The executing test case instance + * @param resourceName The file name of the resource + * @return The path to the resource (never null) + * @throws IllegalArgumentException Thrown if the resource doesn't exist + */ + public static String pathTo(Object testCase, String resourceName) { + String qualifiedName = resource(testCase, resourceName); + return Resources.getResource(qualifiedName).getFile(); + } + + /** + * This doesn't check whether the resource exists! + * + * @param testCase + * @param resourceName + * @return The path to the resource (never null) + */ + public static String resource(Object testCase, String resourceName) { + checkNotNull(testCase); + checkNotNull(resourceName); + + // Note: We append "Data" because otherwise Eclipse would complain about the + // the case's class name clashing with the resource directory's name. + return testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName; + } + + /** + * Return our two types of {@link Pipeline}s for a JUnit Parameterized test. + * + * @param testCase The executing test case's class + * @return The collection to return from a {@link Parameters} provider method + */ + public static Collection<Object[]> pipelinesParams(Class<?> testCase) { + return ImmutableList.copyOf( + new Object[][] { { MemPipeline.getInstance() }, { new MRPipeline(testCase) } + }); + } + + /** + * Serialize the given Writable into a byte array. + * + * @param value The instance to serialize + * @return The serialized data + */ + public static byte[] serialize(Writable value) { + checkNotNull(value); + try { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + value.write(out); + return out.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException("cannot serialize", e); + } + } + + /** + * Serialize the src Writable into a byte array, then deserialize it into dest. + * @param src The instance to serialize + * @param dest The instance to deserialize into + * @return dest, for convenience + */ + public static <T extends Writable> T roundtrip(Writable src, T dest) { + checkNotNull(src); + checkNotNull(dest); + checkArgument(src != dest, "src and dest may not be the same instance"); + + try { + byte[] data = serialize(src); + dest.readFields(ByteStreams.newDataInput(data)); + } catch (IOException e) { + throw new IllegalStateException("cannot deserialize", e); + } + return dest; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/customers.txt ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/customers.txt b/crunch-core/src/it/resources/customers.txt new file mode 100644 index 0000000..98f3f3d --- /dev/null +++ b/crunch-core/src/it/resources/customers.txt @@ -0,0 +1,4 @@ +111|John Doe +222|Jane Doe +333|Someone Else +444|Has No Orders \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/docs.txt ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/docs.txt b/crunch-core/src/it/resources/docs.txt new file mode 100644 index 0000000..90a3f65 --- /dev/null +++ b/crunch-core/src/it/resources/docs.txt @@ -0,0 +1,6 @@ +A this doc has this text +A and this text as well +A but also this +B this doc has some text +B but not as much as the last +B doc http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/emptyTextFile.txt ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/emptyTextFile.txt b/crunch-core/src/it/resources/emptyTextFile.txt new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/letters.txt ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/letters.txt b/crunch-core/src/it/resources/letters.txt new file mode 100644 index 0000000..916bfc9 --- /dev/null +++ b/crunch-core/src/it/resources/letters.txt @@ -0,0 +1,2 @@ +a +bb \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/resources/log4j.properties b/crunch-core/src/it/resources/log4j.properties new file mode 100644 index 0000000..5d144a0 --- /dev/null +++ b/crunch-core/src/it/resources/log4j.properties @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ***** Set root logger level to INFO and its only appender to A. +log4j.logger.org.apache.crunch=info, A + +# Log warnings on Hadoop for the local runner when testing +log4j.logger.org.apache.hadoop=warn, A +# Except for Configuration, which is chatty. +log4j.logger.org.apache.hadoop.conf.Configuration=error, A + +# ***** A is set to be a ConsoleAppender. +log4j.appender.A=org.apache.log4j.ConsoleAppender +# ***** A uses PatternLayout. +log4j.appender.A.layout=org.apache.log4j.PatternLayout +log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
