Repository: crunch Updated Branches: refs/heads/master e90cf2782 -> 85b985a72
CRUNCH-521: Remove hadoop1-related code/dependencies Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/85b985a7 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/85b985a7 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/85b985a7 Branch: refs/heads/master Commit: 85b985a725e181444867efb3b9d5cf7ac8f0ef52 Parents: e90cf27 Author: Josh Wills <[email protected]> Authored: Wed May 20 10:55:55 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Thu May 21 14:18:03 2015 -0700 ---------------------------------------------------------------------- crunch-core/pom.xml | 9 - .../avro/trevni/TrevniFileSourceTargetIT.java | 133 ------------- .../io/avro/trevni/TrevniKeyPipelineIT.java | 195 ------------------- .../mapreduce/TaskAttemptContextFactory.java | 70 ------- .../crunch/impl/mr/run/CrunchRecordReader.java | 15 +- .../org/apache/crunch/io/CrunchOutputs.java | 33 +--- .../io/avro/trevni/TrevniFileReaderFactory.java | 106 ---------- .../crunch/io/avro/trevni/TrevniKeySource.java | 68 ------- .../io/avro/trevni/TrevniKeySourceTarget.java | 40 ---- .../crunch/io/avro/trevni/TrevniKeyTarget.java | 149 -------------- .../io/avro/trevni/TrevniOutputFormat.java | 35 ---- .../io/avro/trevni/TrevniReadableData.java | 39 ---- .../io/avro/trevni/TrevniRecordWriter.java | 140 ------------- .../io/impl/DefaultFileReaderFactory.java | 5 +- .../crunch/impl/spark/SparkRuntimeContext.java | 4 +- pom.xml | 118 ++++------- 16 files changed, 52 insertions(+), 1107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml index 3d671a7..59794f0 100644 --- a/crunch-core/pom.xml +++ b/crunch-core/pom.xml @@ -46,15 +46,6 @@ under the License. </dependency> <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>trevni-avro</artifactId> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>trevni-core</artifactId> - </dependency> - - <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-avro</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java deleted file mode 100644 index d591c65..0000000 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.avro.trevni; - -import com.google.common.collect.Lists; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData.Record; -import org.apache.avro.generic.GenericRecord; -import org.apache.crunch.PCollection; -import org.apache.crunch.Pipeline; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.test.Person; -import org.apache.crunch.test.StringWrapper; -import org.apache.crunch.test.TemporaryPath; -import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.avro.AvroType; -import org.apache.crunch.types.avro.Avros; -import org.apache.hadoop.fs.Path; -import org.apache.trevni.ColumnFileMetaData; -import org.apache.trevni.avro.AvroColumnWriter; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -@SuppressWarnings("serial") -public class TrevniFileSourceTargetIT implements Serializable { - - private transient File avroFile; - @Rule - public transient TemporaryPath tmpDir = TemporaryPaths.create(); - - @Before - public void setUp() throws IOException { - avroFile = tmpDir.getFile("test.avro.trevni"); - } - - private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { - ColumnFileMetaData cfmd = new ColumnFileMetaData(); - AvroColumnWriter writer = new AvroColumnWriter(schema, cfmd); - - for (GenericRecord record : genericRecords) { - writer.write(record); - } - - writer.writeTo(avroFile); - } - - @Test - public void testSpecific() throws IOException { - GenericRecord savedRecord = new Record(Person.SCHEMA$); - savedRecord.put("name", "John Doe"); - savedRecord.put("age", 42); - savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); - - Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); - PCollection<Person> genericCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()), - Avros.records(Person.class))); - - List<Person> personList = Lists.newArrayList(genericCollection.materialize()); - - Person expectedPerson = new Person(); - expectedPerson.name = "John Doe"; - expectedPerson.age = 42; - - List<CharSequence> siblingNames = Lists.newArrayList(); - siblingNames.add("Jimmy"); - siblingNames.add("Jane"); - expectedPerson.siblingnames = siblingNames; - - assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList)); - } - - @Test - public void testGeneric() throws IOException { - String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson"); - Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson); - GenericRecord savedRecord = new Record(genericPersonSchema); - savedRecord.put("name", "John Doe"); - savedRecord.put("age", 42); - savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema); - - Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); - PCollection<Record> genericCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()), - Avros.generics(genericPersonSchema))); - - List<Record> recordList = Lists.newArrayList(genericCollection.materialize()); - - assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList)); - } - - @Test - public void testReflect() throws IOException { - AvroType<StringWrapper> strType = Avros.reflects (StringWrapper.class); - Schema schema = strType.getSchema(); - GenericRecord savedRecord = new Record(schema); - savedRecord.put("value", "stringvalue"); - populateGenericFile(Lists.newArrayList(savedRecord), schema); - - Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); - PCollection<StringWrapper> stringValueCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()), - strType)); - - List<StringWrapper> recordList = Lists.newArrayList(stringValueCollection.materialize()); - - assertEquals(1, recordList.size()); - StringWrapper stringWrapper = recordList.get(0); - assertEquals("stringvalue", stringWrapper.getValue()); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java deleted file mode 100644 index c6ea4fa..0000000 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.crunch.io.avro.trevni; - -import com.google.common.collect.Lists; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.specific.SpecificData; -import org.apache.crunch.PCollection; -import org.apache.crunch.Pipeline; -import org.apache.crunch.Target; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.io.At; -import org.apache.crunch.test.Person; -import org.apache.crunch.test.TemporaryPath; -import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.avro.Avros; -import org.apache.hadoop.fs.Path; -import org.apache.trevni.avro.AvroColumnReader; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.Serializable; -import java.util.List; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -public class TrevniKeyPipelineIT implements Serializable { - - private transient File avroFile; - @Rule - public transient TemporaryPath tmpDir = TemporaryPaths.create(); - - @Before - public void setUp() throws IOException { - avroFile = tmpDir.getFile("test.avro.trevni"); - } - - private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { - FileOutputStream outputStream = new FileOutputStream(this.avroFile); - GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema); - - DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter); - dataFileWriter.create(schema, outputStream); - - for (GenericRecord record : genericRecords) { - dataFileWriter.append(record); - } - - dataFileWriter.close(); - outputStream.close(); - } - - @Test - public void toAvroTrevniKeyTarget() throws Exception { - GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); - savedRecord.put("name", "John Doe"); - savedRecord.put("age", 42); - savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); - - Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration()); - PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), - Avros.records(Person.class))); - File outputFile = tmpDir.getFile("output"); - Target trevniFile = new TrevniKeyTarget(outputFile.getAbsolutePath()); - pipeline.write(genericCollection, trevniFile); - pipeline.run(); - - Person person = genericCollection.materialize().iterator().next(); - - File trvFile = new File(outputFile, "part-m-00000.trv-part-0.trv"); - - AvroColumnReader.Params params = new AvroColumnReader.Params(trvFile); - params.setSchema(Person.SCHEMA$); - params.setModel(SpecificData.get()); - AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params); - - try{ - Person readPerson = reader.next(); - assertThat(readPerson, is(person)); - }finally{ - reader.close(); - } - } - - @Test - public void toAvroTrevniKeyMultipleTarget() throws Exception { - GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); - savedRecord.put("name", "John Doe"); - savedRecord.put("age", 42); - savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); - - Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration()); - PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), - Avros.records(Person.class))); - File output1File = tmpDir.getFile("output1"); - File output2File = tmpDir.getFile("output2"); - pipeline.write(genericCollection, new TrevniKeyTarget(output1File.getAbsolutePath())); - pipeline.write(genericCollection, new TrevniKeyTarget(output2File.getAbsolutePath())); - pipeline.run(); - - Person person = genericCollection.materialize().iterator().next(); - - File trv1File = new File(output1File, "part-m-00000.trv-part-0.trv"); - File trv2File = new File(output2File, "part-m-00000.trv-part-0.trv"); - - AvroColumnReader.Params params = new AvroColumnReader.Params(trv1File); - params.setSchema(Person.SCHEMA$); - params.setModel(SpecificData.get()); - AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params); - - try{ - Person readPerson = reader.next(); - assertThat(readPerson, is(person)); - }finally{ - reader.close(); - } - - params = new AvroColumnReader.Params(trv2File); - params.setSchema(Person.SCHEMA$); - params.setModel(SpecificData.get()); - reader = new AvroColumnReader<Person>(params); - - try{ - Person readPerson = reader.next(); - assertThat(readPerson, is(person)); - }finally{ - reader.close(); - } - } - - @Test - public void toAvroTrevniKeyTargetReadSource() throws Exception { - GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); - savedRecord.put("name", "John Doe"); - savedRecord.put("age", 42); - savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); - populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); - - Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration()); - PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), - Avros.records(Person.class))); - File outputFile = tmpDir.getFile("output"); - Target trevniFile = new TrevniKeyTarget(outputFile.getAbsolutePath()); - pipeline.write(genericCollection, trevniFile); - pipeline.run(); - - Person person = genericCollection.materialize().iterator().next(); - - PCollection<Person> retrievedPeople = pipeline.read(new TrevniKeySource<Person>( - new Path(outputFile.toURI()), Avros.records(Person.class))); - - Person retrievedPerson = retrievedPeople.materialize().iterator().next(); - - assertThat(retrievedPerson, is(person)); - - File trvFile = new File(outputFile, "part-m-00000.trv-part-0.trv"); - - AvroColumnReader.Params params = new AvroColumnReader.Params(trvFile); - params.setSchema(Person.SCHEMA$); - params.setModel(SpecificData.get()); - AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params); - - try{ - Person readPerson = reader.next(); - assertThat(readPerson, is(person)); - }finally{ - reader.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java deleted file mode 100644 index 256fa42..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.hadoop.mapreduce; - -import java.lang.reflect.Constructor; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A factory class that allows us to hide the fact that {@code TaskAttemptContext} is a class in - * Hadoop 1.x.x and an interface in Hadoop 2.x.x. - */ -@SuppressWarnings("unchecked") -public class TaskAttemptContextFactory { - - private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptContextFactory.class); - - private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory(); - - public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) { - return INSTANCE.createInternal(conf, taskAttemptId); - } - - private Constructor<TaskAttemptContext> taskAttemptConstructor; - - private TaskAttemptContextFactory() { - Class<TaskAttemptContext> implClass = TaskAttemptContext.class; - if (implClass.isInterface()) { - try { - implClass = (Class<TaskAttemptContext>) Class.forName( - "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); - } catch (ClassNotFoundException e) { - LOG.error("Could not find TaskAttemptContextImpl class, exiting", e); - } - } - try { - this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class); - } catch (Exception e) { - LOG.error("Could not access TaskAttemptContext constructor, exiting", e); - } - } - - private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) { - try { - return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId); - } catch (Exception e) { - LOG.error("Could not construct a TaskAttemptContext instance", e); - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java index e475f10..2842658 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java @@ -19,7 +19,6 @@ package org.apache.crunch.impl.mr.run; import java.io.IOException; -import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -27,6 +26,7 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; class CrunchRecordReader<K, V> extends RecordReader<K, V> { @@ -44,12 +44,12 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { if (crunchSplit.get() instanceof CombineFileSplit) { combineFileSplit = (CombineFileSplit) crunchSplit.get(); } - this.context = context; Configuration conf = crunchSplit.getConf(); if (conf == null) { conf = context.getConfiguration(); crunchSplit.setConf(conf); } + this.context = new TaskAttemptContextImpl(conf, context.getTaskAttemptID()); initNextRecordReader(); } @@ -75,8 +75,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance( crunchSplit.getInputFormatClass(), conf); - this.curReader = inputFormat.createRecordReader(getDelegateSplit(), - TaskAttemptContextFactory.create(conf, context.getTaskAttemptID())); + this.curReader = inputFormat.createRecordReader(getDelegateSplit(), context); return true; } @@ -137,18 +136,17 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { this.crunchSplit = (CrunchInputSplit) inputSplit; - this.context = context; Configuration conf = crunchSplit.getConf(); if (conf == null) { conf = context.getConfiguration(); crunchSplit.setConf(conf); } + this.context = new TaskAttemptContextImpl(conf, context.getTaskAttemptID()); if (crunchSplit.get() instanceof CombineFileSplit) { combineFileSplit = (CombineFileSplit) crunchSplit.get(); } if (curReader != null) { - curReader.initialize(getDelegateSplit(), - TaskAttemptContextFactory.create(conf, context.getTaskAttemptID())); + curReader.initialize(getDelegateSplit(), this.context); } } @@ -159,8 +157,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> { return false; } if (curReader != null) { - curReader.initialize(getDelegateSplit(), - TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID())); + curReader.initialize(getDelegateSplit(), context); } } return true; http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index 247ac08..653a401 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -18,9 +18,7 @@ package org.apache.crunch.io; import com.google.common.collect.Sets; -import java.lang.reflect.Method; import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -33,6 +31,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Joiner; @@ -211,9 +210,7 @@ public class CrunchOutputs<K, V> { } private static TaskAttemptContext getTaskContext(TaskAttemptContext baseContext, Job job) { - org.apache.hadoop.mapreduce.TaskAttemptID baseTaskId = baseContext.getTaskAttemptID(); - // Create a task ID context with our specialized job ID. org.apache.hadoop.mapreduce.TaskAttemptID taskId; taskId = new org.apache.hadoop.mapreduce.TaskAttemptID(job.getJobID().getJtIdentifier(), @@ -221,36 +218,14 @@ public class CrunchOutputs<K, V> { baseTaskId.isMap(), baseTaskId.getTaskID().getId(), baseTaskId.getId()); - - return TaskAttemptContextFactory.create( - job.getConfiguration(), taskId); + return new TaskAttemptContextImpl(job.getConfiguration(), taskId); } private static void setJobID(Job job, JobID jobID, String namedOutput) { - Method setJobIDMethod; - JobID newJobID = jobID; - try { - // Hadoop 2 - setJobIDMethod = Job.class.getMethod("setJobID", JobID.class); - // Add the named output to the job ID, since that is used by some output formats - // to create temporary outputs. - newJobID = jobID == null || jobID.getJtIdentifier().contains(namedOutput) ? + JobID newJobID = jobID == null || jobID.getJtIdentifier().contains(namedOutput) ? jobID : new JobID(jobID.getJtIdentifier() + "_" + namedOutput, jobID.getId()); - } catch (NoSuchMethodException e) { - // Hadoop 1's setJobID method is package private and declared by JobContext - try { - setJobIDMethod = JobContext.class.getDeclaredMethod("setJobID", JobID.class); - } catch (NoSuchMethodException e1) { - throw new CrunchRuntimeException(e); - } - setJobIDMethod.setAccessible(true); - } - try { - setJobIDMethod.invoke(job, newJobID); - } catch (Exception e) { - throw new CrunchRuntimeException("Could not set job ID to " + jobID, e); - } + job.setJobID(newJobID); } private static void configureJob( http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java deleted file mode 100644 index 3caa586..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.avro.trevni; - -import com.google.common.collect.Iterators; -import com.google.common.collect.UnmodifiableIterator; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.specific.SpecificData; -import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.MapFn; -import org.apache.crunch.fn.IdentityFn; -import org.apache.crunch.io.FileReaderFactory; -import org.apache.crunch.io.impl.AutoClosingIterator; -import org.apache.crunch.types.avro.AvroType; -import org.apache.crunch.types.avro.Avros; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.trevni.Input; -import org.apache.trevni.avro.AvroColumnReader; -import org.apache.trevni.avro.HadoopInput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Iterator; - -public class TrevniFileReaderFactory<T> implements FileReaderFactory<T> { - - private static final Logger LOG = LoggerFactory.getLogger(TrevniFileReaderFactory.class); - private final AvroType<T> aType; - private final MapFn<T, T> mapFn; - private final Schema schema; - - public TrevniFileReaderFactory(AvroType<T> atype) { - this.aType = atype; - schema = atype.getSchema(); - this.mapFn = (MapFn<T, T>) atype.getInputMapFn(); - } - - public TrevniFileReaderFactory(Schema schema) { - this.aType = null; - this.schema = schema; - this.mapFn = IdentityFn.<T>getInstance(); - } - - static <T> AvroColumnReader<T> getReader(Input input, AvroType<T> avroType, Schema schema) { - AvroColumnReader.Params params = new AvroColumnReader.Params(input); - params.setSchema(schema); - if (avroType.hasReflect()) { - if (avroType.hasSpecific()) { - Avros.checkCombiningSpecificAndReflectionSchemas(); - } - params.setModel(ReflectData.get()); - } else if (avroType.hasSpecific()) { - params.setModel(SpecificData.get()); - } else { - params.setModel(GenericData.get()); - } - - try { - return new AvroColumnReader<T>(params); - } catch (IOException e) { - throw new CrunchRuntimeException(e); - } - } - - @Override - public Iterator<T> read(FileSystem fs, final Path path) { - this.mapFn.initialize(); - try { - HadoopInput input = new HadoopInput(path, fs.getConf()); - final AvroColumnReader<T> reader = getReader(input, aType, schema); - return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { - @Override - public boolean hasNext() { - return reader.hasNext(); - } - - @Override - public T next() { - return mapFn.map(reader.next()); - } - }); - } catch (IOException e) { - LOG.info("Could not read avro file at path: {}", path, e); - return Iterators.emptyIterator(); - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java deleted file mode 100644 index fb0e8fe..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.avro.trevni; - -import java.util.List; -import org.apache.avro.mapred.AvroJob; -import org.apache.crunch.impl.mr.run.RuntimeParameters; -import org.apache.crunch.io.FormatBundle; -import org.apache.crunch.io.ReadableSource; -import org.apache.crunch.ReadableData; -import org.apache.crunch.io.impl.FileSourceImpl; -import org.apache.crunch.types.avro.AvroType; -import org.apache.crunch.types.avro.Avros; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.trevni.avro.mapreduce.AvroTrevniKeyInputFormat; - -import java.io.IOException; - -public class TrevniKeySource<T> extends FileSourceImpl<T> implements ReadableSource<T> { - - private static <S> FormatBundle getBundle(AvroType<S> ptype) { - return FormatBundle.forInput(AvroTrevniKeyInputFormat.class) - .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect())) - .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString()) - .set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.FALSE.toString()) - .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()); - } - - public TrevniKeySource(Path path, AvroType<T> ptype) { - super(path, ptype, getBundle(ptype)); - } - - public TrevniKeySource(List<Path> paths, AvroType<T> ptype) { - super(paths, ptype, getBundle(ptype)); - } - - @Override - public String toString() { - return "TrevniKey(" + pathsAsString() + ")"; - } - - @Override - public Iterable<T> read(Configuration conf) throws IOException { - return read(conf, new TrevniFileReaderFactory<T>((AvroType<T>) ptype)); - } - - @Override - public ReadableData<T> asReadable() { - return new TrevniReadableData<T>(paths, (AvroType<T>) ptype); - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java deleted file mode 100644 index 376d2ba..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.avro.trevni; - -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.SequentialFileNamingScheme; -import org.apache.crunch.io.avro.AvroFileTarget; -import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; -import org.apache.crunch.types.avro.AvroType; -import org.apache.hadoop.fs.Path; - -public class TrevniKeySourceTarget<T> extends ReadableSourcePathTargetImpl<T> { - public TrevniKeySourceTarget(Path path, AvroType<T> atype) { - this(path, atype, SequentialFileNamingScheme.getInstance()); - } - - public TrevniKeySourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) { - super(new TrevniKeySource(path, atype), new AvroFileTarget(path), fileNamingScheme); - } - - @Override - public String toString() { - return target.toString(); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java deleted file mode 100644 index 5db11f0..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.avro.trevni; - -import org.apache.avro.hadoop.io.AvroKeyComparator; -import org.apache.avro.hadoop.io.AvroSerialization; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroJob; -import org.apache.crunch.SourceTarget; -import org.apache.crunch.impl.mr.plan.PlanningParameters; -import org.apache.crunch.io.CrunchOutputs; -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.FormatBundle; -import org.apache.crunch.io.OutputHandler; -import org.apache.crunch.io.SequentialFileNamingScheme; -import org.apache.crunch.io.impl.FileTargetImpl; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.avro.AvroType; -import org.apache.crunch.types.avro.Avros; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.StringUtils; - -import java.io.IOException; -import java.util.Collection; - -import static org.apache.crunch.types.avro.Avros.REFLECT_DATA_FACTORY; -import static org.apache.crunch.types.avro.Avros.REFLECT_DATA_FACTORY_CLASS; - -public class TrevniKeyTarget extends FileTargetImpl { - - public TrevniKeyTarget(String path) { - this(new Path(path)); - } - - public TrevniKeyTarget(Path path) { - this(path, SequentialFileNamingScheme.getInstance()); - } - - public TrevniKeyTarget(Path path, FileNamingScheme fileNamingScheme) { - super(path, TrevniOutputFormat.class, fileNamingScheme); - } - - @Override - public String toString() { - return "TrevniKey(" + path.toString() + ")"; - } - - @Override - public boolean accept(OutputHandler handler, PType<?> ptype) { - if (!(ptype instanceof AvroType)) { - return false; - } - handler.configure(this, ptype); - return true; - } - - @Override - public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { - AvroType<?> atype = (AvroType<?>) ptype; - Configuration conf = job.getConfiguration(); - - if (null == name) { - AvroJob.setOutputKeySchema(job, atype.getSchema()); - AvroJob.setMapOutputKeySchema(job, atype.getSchema()); - - Avros.configureReflectDataFactory(conf); - configureForMapReduce(job, AvroKey.class, NullWritable.class, FormatBundle.forOutput(TrevniOutputFormat.class), - outputPath, null); - } else { - FormatBundle<TrevniOutputFormat> bundle = FormatBundle.forOutput( - TrevniOutputFormat.class); - - bundle.set("avro.schema.output.key", atype.getSchema().toString()); - bundle.set("mapred.output.value.groupfn.class", AvroKeyComparator.class.getName()); - bundle.set("mapred.output.key.comparator.class", AvroKeyComparator.class.getName()); - bundle.set("avro.serialization.key.writer.schema", atype.getSchema().toString()); - bundle.set("avro.serialization.key.reader.schema", atype.getSchema().toString()); - - //Equivalent to... - // AvroSerialization.addToConfiguration(job.getConfiguration()); - Collection<String> serializations = conf.getStringCollection("io.serializations"); - if (!serializations.contains(AvroSerialization.class.getName())) { - serializations.add(AvroSerialization.class.getName()); - bundle.set(name, StringUtils.arrayToString(serializations.toArray(new String[serializations.size()]))); - } - - //The following is equivalent to Avros.configureReflectDataFactory(conf); - bundle.set(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass().getName()); - - //Set output which honors the name. - bundle.set("mapred.output.dir", new Path(outputPath, name).toString()); - - //Set value which will be ignored but should get past the FileOutputFormat.checkOutputSpecs(..) - //which requires the "mapred.output.dir" value to be set. - try{ - FileOutputFormat.setOutputPath(job, outputPath); - } catch(Exception ioe){ - throw new RuntimeException(ioe); - } - - CrunchOutputs.addNamedOutput(job, name, - bundle, - AvroKey.class, - NullWritable.class); - } - } - - @Override - protected Path getSourcePattern(final Path workingPath, final int index) { - //output directories are typically of the form - //out#/part-m-#####/part-m-#####/part-#.trv but we don't want both of those folders because it isn't - //readable by the TrevniKeySource. - return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "*/part-*/part-*"); - } - - @Override - protected Path getDestFile(final Configuration conf, final Path src, final Path dir, final boolean mapOnlyJob) throws IOException { - Path outputFilename = super.getDestFile(conf, src, dir, true); - //make sure the dst file is unique in the case there are multiple part-#.trv files per partition. - return new Path(outputFilename.toString()+"-"+src.getName()); - } - - @Override - public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { - if (ptype instanceof AvroType) { - return new TrevniKeySourceTarget(path, (AvroType<T>) ptype); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java deleted file mode 100644 index a2b7dc9..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniOutputFormat.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.avro.trevni; - -import java.io.IOException; - -import org.apache.avro.mapred.AvroKey; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -public class TrevniOutputFormat<T> extends FileOutputFormat<AvroKey<T>, NullWritable> { - - @Override - public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new TrevniRecordWriter<T>(context); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java deleted file mode 100644 index 5a681c4..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniReadableData.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.avro.trevni; - -import org.apache.crunch.io.FileReaderFactory; -import org.apache.crunch.io.impl.ReadableDataImpl; -import org.apache.crunch.types.avro.AvroType; -import org.apache.hadoop.fs.Path; - -import java.util.List; - -public class TrevniReadableData<T> extends ReadableDataImpl<T> { - private final AvroType<T> avroType; - - public TrevniReadableData(List<Path> paths, AvroType<T> avroType) { - super(paths); - this.avroType = avroType; - } - - @Override - protected FileReaderFactory<T> getFileReaderFactory() { - return new TrevniFileReaderFactory<T>(avroType); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java deleted file mode 100644 index 74bb796..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniRecordWriter.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io.avro.trevni; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.avro.Schema; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroJob; -import org.apache.avro.reflect.ReflectData; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.trevni.ColumnFileMetaData; -import org.apache.trevni.MetaData; -import org.apache.trevni.avro.AvroColumnWriter; - -/** - * - */ -public class TrevniRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> { - - /** trevni file extension */ - public final static String EXT = ".trv"; - - /** prefix of job configs that we care about */ - public static final String META_PREFIX = "trevni.meta."; - - /** Counter that increments as new trevni files are create because the current file - * has exceeded the block size - * */ - protected int part = 0; - - /** Trevni file writer */ - protected AvroColumnWriter<T> writer; - - /** This will be a unique directory linked to the task */ - final Path dirPath; - - /** HDFS object */ - final FileSystem fs; - - /** Current configured blocksize */ - final long blockSize; - - /** Provided avro schema from the context */ - protected Schema schema; - - /** meta data to be stored in the output file. */ - protected ColumnFileMetaData meta; - - public TrevniRecordWriter(TaskAttemptContext context) throws IOException { - schema = initSchema(context); - meta = filterMetadata(context.getConfiguration()); - writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get()); - - Path outputPath = FileOutputFormat.getOutputPath(context); - - String dir = FileOutputFormat.getUniqueFile(context, "part", ""); - dirPath = new Path(outputPath.toString() + "/" + dir); - fs = dirPath.getFileSystem(context.getConfiguration()); - fs.mkdirs(dirPath); - - blockSize = fs.getDefaultBlockSize(); - } - - /** {@inheritDoc} */ - @Override - public void write(AvroKey<T> key, NullWritable value) throws IOException, - InterruptedException { - writer.write(key.datum()); - if (writer.sizeEstimate() >= blockSize) // block full - flush(); - } - - /** {@inheritDoc} */ - protected Schema initSchema(TaskAttemptContext context) { - boolean isMapOnly = context.getNumReduceTasks() == 0; - return isMapOnly ? AvroJob.getMapOutputKeySchema(context - .getConfiguration()) : AvroJob.getOutputKeySchema(context - .getConfiguration()); - } - - /** - * A Trevni flush will close the current file and prep a new writer - * @throws IOException - */ - public void flush() throws IOException { - OutputStream out = fs.create(new Path(dirPath, "part-" + (part++) + EXT)); - try { - writer.writeTo(out); - } finally { - out.close(); - } - writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get()); - } - - /** {@inheritDoc} */ - @Override - public void close(TaskAttemptContext arg0) throws IOException, - InterruptedException { - flush(); - } - - static ColumnFileMetaData filterMetadata(final Configuration configuration) { - final ColumnFileMetaData meta = new ColumnFileMetaData(); - Iterator<Entry<String, String>> keyIterator = configuration.iterator(); - - while (keyIterator.hasNext()) { - Entry<String, String> confEntry = keyIterator.next(); - if (confEntry.getKey().startsWith(META_PREFIX)) - meta.put(confEntry.getKey().substring(META_PREFIX.length()), confEntry - .getValue().getBytes(MetaData.UTF8)); - } - - return meta; - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java index 90c15fa..e0e1326 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/DefaultFileReaderFactory.java @@ -22,7 +22,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.UnmodifiableIterator; import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory; import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.types.PType; @@ -36,11 +35,11 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Iterator; class DefaultFileReaderFactory<T> implements FileReaderFactory<T> { @@ -62,7 +61,7 @@ class DefaultFileReaderFactory<T> implements FileReaderFactory<T> { ptype.initialize(conf); final InputFormat fmt = ReflectionUtils.newInstance(bundle.getFormatClass(), conf); - final TaskAttemptContext ctxt = TaskAttemptContextFactory.create(conf, new TaskAttemptID()); + final TaskAttemptContext ctxt = new TaskAttemptContextImpl(conf, new TaskAttemptID()); try { Job job = new Job(conf); FileInputFormat.addInputPath(job, path); http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java index 44d3573..cea17e6 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java @@ -24,7 +24,6 @@ import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.DoFn; -import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.TaskInputOutputContextFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.mapred.SparkCounter; @@ -34,6 +33,7 @@ import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.task.MapContextImpl; import org.apache.spark.Accumulator; import org.apache.spark.SparkFiles; import org.apache.spark.broadcast.Broadcast; @@ -80,7 +80,7 @@ public class SparkRuntimeContext implements Serializable { lastTID = null; } configureLocalFiles(); - context = TaskInputOutputContextFactory.create(getConfiguration(), attemptID, new SparkReporter(counters)); + context = new MapContextImpl(getConfiguration(), attemptID, null, null, null, new SparkReporter(counters), null); } fn.setContext(context); fn.initialize(); http://git-wip-us.apache.org/repos/asf/crunch/blob/85b985a7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 942f203..cce69bf 100644 --- a/pom.xml +++ b/pom.xml @@ -89,10 +89,11 @@ under the License. <pkg>org.apache.crunch</pkg> <!-- Can be overridden by hadoop-2 profile, but these are the default values --> - <hadoop.version>1.1.2</hadoop.version> - <hbase.version>0.98.1-hadoop1</hbase.version> - <hbase.midfix>hadoop1</hbase.midfix> - <avro.classifier>hadoop1</avro.classifier> + <hadoop.version>2.2.0</hadoop.version> + <hbase.version>0.98.1-hadoop2</hbase.version> + <commons-lang.version>2.5</commons-lang.version> + <hbase.midfix>hadoop2</hbase.midfix> + <avro.classifier>hadoop2</avro.classifier> <scala.base.version>2.10</scala.base.version> <scala.version>2.10.4</scala.version> @@ -224,26 +225,6 @@ under the License. </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>trevni-core</artifactId> - <version>${avro.version}</version> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>trevni-avro</artifactId> - <version>${avro.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - </exclusion> - </exclusions> - </dependency> <dependency> <groupId>com.twitter</groupId> @@ -469,64 +450,41 @@ under the License. <artifactId>jsr305</artifactId> <version>${jsr305.version}</version> </dependency> - </dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <version>${hadoop.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> </dependencyManagement> - <profiles> - <profile> - <id>hadoop-2</id> - <activation> - <property> - <name>crunch.platform</name> - <value>2</value> - </property> - </activation> - <properties> - <hadoop.version>2.2.0</hadoop.version> - <hbase.version>0.98.1-hadoop2</hbase.version> - <commons-lang.version>2.5</commons-lang.version> - <hbase.midfix>hadoop2</hbase.midfix> - <avro.classifier>hadoop2</avro.classifier> - </properties> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <version>${hadoop.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - <version>${hadoop.version}</version> - </dependency> - </dependencies> - </dependencyManagement> - </profile> - </profiles> - <build> <plugins> <plugin>
