Repository: crunch Updated Branches: refs/heads/master 82cecc0ce -> 253326148
CRUNCH-479: DProperly handle materialization of APPEND-style PCollections Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/25332614 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/25332614 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/25332614 Branch: refs/heads/master Commit: 25332614830ca61df05d2f68074f4462b8bdf1d4 Parents: 82cecc0 Author: Josh Wills <[email protected]> Authored: Mon Oct 27 13:43:28 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Oct 28 04:39:44 2014 -0700 ---------------------------------------------------------------------- .../crunch/io/avro/AvroFileSourceTargetIT.java | 70 +++++++++++++++++--- .../crunch/impl/dist/DistributedPipeline.java | 10 ++- .../org/apache/crunch/impl/mr/MRPipeline.java | 2 +- .../apache/crunch/impl/mr/exec/MRExecutor.java | 5 +- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 7 +- 5 files changed, 79 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java index 9f51f23..511e827 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java @@ -34,9 +34,11 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; import org.apache.crunch.io.From; +import org.apache.crunch.io.To; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; @@ -58,11 +60,15 @@ public class AvroFileSourceTargetIT implements Serializable { @Before public void setUp() throws IOException { - avroFile = tmpDir.getFile("test.avro"); + avroFile = getTmpFile("test.avro"); } - private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { - FileOutputStream outputStream = new FileOutputStream(this.avroFile); + private File getTmpFile(String file){ + return tmpDir.getFile(file); + } + + private void populateGenericFile(File outFile, List<GenericRecord> genericRecords, Schema schema) throws IOException { + FileOutputStream outputStream = new FileOutputStream(outFile); GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter); @@ -83,7 +89,7 @@ public class AvroFileSourceTargetIT implements Serializable { savedRecord.put("name", "John Doe"); savedRecord.put("age", 42); savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + populateGenericFile(avroFile, Lists.newArrayList(savedRecord), Person.SCHEMA$); Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), @@ -92,15 +98,59 @@ public class AvroFileSourceTargetIT implements Serializable { List<Person> personList = Lists.newArrayList(genericCollection.materialize()); Person expectedPerson = new Person(); - expectedPerson.name = "John Doe"; - expectedPerson.age = 42; + expectedPerson.setName("John Doe"); + expectedPerson.setAge(42); List<CharSequence> siblingNames = Lists.newArrayList(); siblingNames.add("Jimmy"); siblingNames.add("Jane"); - expectedPerson.siblingnames = siblingNames; + expectedPerson.setSiblingnames(siblingNames); + + assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList)); + } + + @Test + public void testMaterializeAppendMode() throws IOException { + File parentPath = getTmpFile("existing"); + parentPath.mkdir(); + File existingRecordsFile = new File(parentPath, "test.avro"); + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(existingRecordsFile, Lists.newArrayList(savedRecord), Person.SCHEMA$); + + GenericRecord secondRecord = new GenericData.Record(Person.SCHEMA$); + secondRecord.put("name", "Admiral Ackbar"); + secondRecord.put("age", 37); + secondRecord.put("siblingnames", Lists.newArrayList("Itsa", "Trap")); + + File newRecordsParent = getTmpFile("new"); + newRecordsParent.mkdir(); + File newRecordsFile = new File(newRecordsParent, "test.avro"); + populateGenericFile(newRecordsFile, Lists.newArrayList(secondRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> people = pipeline.read(At.avroFile(newRecordsParent.getAbsolutePath(), + Avros.records(Person.class))); + + pipeline.write(people, To.avroFile(parentPath.getAbsolutePath()), Target.WriteMode.APPEND); + pipeline.run(); + + List<Person> personList = Lists.newArrayList(people.materialize()); + + Person expectedPerson = new Person(); + expectedPerson.setName("Admiral Ackbar"); + expectedPerson.setAge(37); + + List<CharSequence> siblingNames = Lists.newArrayList(); + siblingNames.add("Itsa"); + siblingNames.add("Trap"); + expectedPerson.setSiblingnames(siblingNames); assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList)); + + pipeline.done(); } @Test @@ -109,7 +159,7 @@ public class AvroFileSourceTargetIT implements Serializable { savedRecord.put("name", "John Doe"); savedRecord.put("age", 42); savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + populateGenericFile(avroFile, Lists.newArrayList(savedRecord), Person.SCHEMA$); Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); PCollection<GenericData.Record> genericCollection = pipeline.read(From.avroFile( @@ -129,7 +179,7 @@ public class AvroFileSourceTargetIT implements Serializable { savedRecord.put("name", "John Doe"); savedRecord.put("age", 42); savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema); + populateGenericFile(avroFile, Lists.newArrayList(savedRecord), genericPersonSchema); Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); PCollection<Record> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), @@ -145,7 +195,7 @@ public class AvroFileSourceTargetIT implements Serializable { Schema pojoPersonSchema = ReflectData.get().getSchema(StringWrapper.class); GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema); savedRecord.put("value", "stringvalue"); - populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema); + populateGenericFile(avroFile, Lists.newArrayList(savedRecord), pojoPersonSchema); Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); PCollection<StringWrapper> stringValueCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java index 2d236ae..64510f4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java @@ -70,6 +70,7 @@ public abstract class DistributedPipeline implements Pipeline { protected final Map<PCollectionImpl<?>, Set<Target>> outputTargets; protected final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize; protected final Map<PipelineCallable<?>, Set<Target>> allPipelineCallables; + protected final Set<Target> appendedTargets; private Path tempDirectory; private int tempFileIndex; private int nextAnonymousStageId; @@ -89,6 +90,7 @@ public abstract class DistributedPipeline implements Pipeline { this.outputTargets = Maps.newHashMap(); this.outputTargetsToMaterialize = Maps.newHashMap(); this.allPipelineCallables = Maps.newHashMap(); + this.appendedTargets = Sets.newHashSet(); this.conf = conf; this.tempDirectory = createTempDirectory(conf); this.tempFileIndex = 0; @@ -191,6 +193,12 @@ public abstract class DistributedPipeline implements Pipeline { throw new CrunchRuntimeException("Target " + target + " is already written in current run." + " Use WriteMode.APPEND in order to write additional data to it."); } + + // Need special handling for append targets in the case of materialization + if (writeMode == Target.WriteMode.APPEND) { + appendedTargets.add(target); + } + addOutput((PCollectionImpl<?>) pcollection, target); } @@ -264,7 +272,7 @@ public abstract class DistributedPipeline implements Pipeline { ReadableSourceTarget<T> srcTarget = null; if (outputTargets.containsKey(pcollection)) { for (Target target : outputTargets.get(impl)) { - if (target instanceof ReadableSourceTarget) { + if (target instanceof ReadableSourceTarget && !appendedTargets.contains(target)) { return (ReadableSourceTarget<T>) target; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 1c48c62..d23988b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -106,7 +106,7 @@ public class MRPipeline extends DistributedPipeline { outputTargetsToMaterialize.remove(c); } } - MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize, allPipelineCallables); + MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize, appendedTargets, allPipelineCallables); try { return planner.plan(jarClass, getConfiguration()); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 2cefc04..024fcce 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -59,6 +59,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe private final CrunchJobControl control; private final Map<PCollectionImpl<?>, Set<Target>> outputTargets; private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; + private final Set<Target> appendedTargets; private final CountDownLatch doneSignal = new CountDownLatch(1); private final CountDownLatch killSignal = new CountDownLatch(1); private final CappedExponentialCounter pollInterval; @@ -74,10 +75,12 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets, Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize, + Set<Target> appendedTargets, Map<PipelineCallable<?>, Set<Target>> pipelineCallables) { this.control = new CrunchJobControl(conf, jarClass.toString(), pipelineCallables); this.outputTargets = outputTargets; this.toMaterialize = toMaterialize; + this.appendedTargets = appendedTargets; this.monitorThread = new Thread(new Runnable() { @Override public void run() { @@ -147,7 +150,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe } else { boolean materialized = false; for (Target t : outputTargets.get(c)) { - if (!materialized) { + if (!materialized && !appendedTargets.contains(t)) { if (t instanceof SourceTarget) { c.materializeAt((SourceTarget) t); materialized = true; http://git-wip-us.apache.org/repos/asf/crunch/blob/25332614/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 70acb59..91e3036 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -53,16 +53,19 @@ public class MSCRPlanner { private final MRPipeline pipeline; private final Map<PCollectionImpl<?>, Set<Target>> outputs; private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; + private final Set<Target> appendedTargets; private final Map<PipelineCallable<?>, Set<Target>> pipelineCallables; private int lastJobID = 0; public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs, Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize, + Set<Target> appendedTargets, Map<PipelineCallable<?>, Set<Target>> pipelineCallables) { this.pipeline = pipeline; this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR); this.outputs.putAll(outputs); this.toMaterialize = toMaterialize; + this.appendedTargets = appendedTargets; this.pipelineCallables = pipelineCallables; } @@ -117,7 +120,7 @@ public class MSCRPlanner { } if (!hasInputs) { LOG.warn("No input sources for pipeline, nothing to do..."); - return new MRExecutor(conf, jarClass, outputs, toMaterialize, pipelineCallables); + return new MRExecutor(conf, jarClass, outputs, toMaterialize, appendedTargets, pipelineCallables); } // Create a new graph that splits up up dependent GBK nodes. @@ -191,7 +194,7 @@ public class MSCRPlanner { // Finally, construct the jobs from the prototypes and return. DotfileWriter dotfileWriter = new DotfileWriter(); - MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize, pipelineCallables); + MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize, appendedTargets, pipelineCallables); for (JobPrototype proto : Sets.newHashSet(assignments.values())) { dotfileWriter.addJobPrototype(proto); exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID));
