Repository: crunch Updated Branches: refs/heads/master 2469348f6 -> 5609b0143
http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchDefaultOutputCommitterContainer.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchDefaultOutputCommitterContainer.java b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchDefaultOutputCommitterContainer.java new file mode 100644 index 0000000..893e83b --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchDefaultOutputCommitterContainer.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * A thin extension of the Hive {@link DefaultOutputCommitterContainer}. This is + * to insert crunch specific logic to strip the named output from the + * TaskAttemptID. + */ +class CrunchDefaultOutputCommitterContainer extends DefaultOutputCommitterContainer { + + /** + * @param context + * current JobContext + * @param baseCommitter + * OutputCommitter to contain + * @throws IOException + */ + public CrunchDefaultOutputCommitterContainer(JobContext context, OutputCommitter baseCommitter) throws IOException { + super(context, baseCommitter); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + getBaseOutputCommitter().setupTask(HCatMapRedUtils.getOldTaskAttemptContext(context)); + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + getBaseOutputCommitter().abortTask(HCatMapRedUtils.getOldTaskAttemptContext(context)); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + getBaseOutputCommitter().commitTask(HCatMapRedUtils.getOldTaskAttemptContext(context)); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtils.getOldTaskAttemptContext(context)); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchDefaultOutputFormatContainer.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchDefaultOutputFormatContainer.java b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchDefaultOutputFormatContainer.java new file mode 100644 index 0000000..c74a774 --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchDefaultOutputFormatContainer.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +class CrunchDefaultOutputFormatContainer extends DefaultOutputFormatContainer { + + public CrunchDefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) { + super(of); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return new CrunchDefaultOutputCommitterContainer(context, + new JobConf(context.getConfiguration()).getOutputCommitter()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchFileOutputCommitterContainer.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchFileOutputCommitterContainer.java b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchFileOutputCommitterContainer.java new file mode 100644 index 0000000..7d001dd --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchFileOutputCommitterContainer.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.hadoop.mapred.OutputCommitter;; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Thin extension to construct valid + * {@link org.apache.hadoop.mapred.TaskAttemptID}, and remove the crunch named + * output from the {@link org.apache.hadoop.mapreduce.TaskAttemptID}. + */ +public class CrunchFileOutputCommitterContainer extends FileOutputCommitterContainer { + + private final boolean dynamicPartitioningUsed; + + /** + * @param context + * current JobContext + * @param baseCommitter + * OutputCommitter to contain + * @throws IOException + */ + public CrunchFileOutputCommitterContainer(JobContext context, OutputCommitter baseCommitter) throws IOException { + super(context, baseCommitter); + dynamicPartitioningUsed = HCatOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed(); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed) { + getBaseOutputCommitter().setupTask(HCatMapRedUtils.getOldTaskAttemptContext(context)); + } + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed) { + return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtils.getOldTaskAttemptContext(context)); + } else { + // called explicitly through FileRecordWriterContainer.close() if dynamic + // - return false by default + return true; + } + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed) { + // See HCATALOG-499 + FileOutputFormatContainer.setWorkOutputPath(context); + getBaseOutputCommitter().commitTask(HCatMapRedUtils.getOldTaskAttemptContext(context)); + } else { + try { + TaskCommitContextRegistry.getInstance().commitTask(context); + } finally { + TaskCommitContextRegistry.getInstance().discardCleanupFor(context); + } + } + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed) { + getBaseOutputCommitter().abortTask(HCatMapRedUtils.getOldTaskAttemptContext(context)); + } else { + try { + TaskCommitContextRegistry.getInstance().abortTask(context); + } finally { + TaskCommitContextRegistry.getInstance().discardCleanupFor(context); + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchFileOutputFormatContainer.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchFileOutputFormatContainer.java b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchFileOutputFormatContainer.java new file mode 100644 index 0000000..04dcfad --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchFileOutputFormatContainer.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * Thin extention to return an {@link OutputCommitter} that carries out crunch + * specific logic + */ +class CrunchFileOutputFormatContainer extends FileOutputFormatContainer { + + /** + * @param of + * base OutputFormat to contain + */ + public CrunchFileOutputFormatContainer(OutputFormat<? super WritableComparable<?>, ? super Writable> of) { + super(of); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + // this needs to be manually set, under normal circumstances MR Task does + // this + setWorkOutputPath(context); + return new CrunchFileOutputCommitterContainer(context, + HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed() ? null + : new JobConf(context.getConfiguration()).getOutputCommitter()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchHCatOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchHCatOutputFormat.java b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchHCatOutputFormat.java new file mode 100644 index 0000000..7a4a381 --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/CrunchHCatOutputFormat.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.HCatRecord; + +import java.io.IOException; + +/** + * Thin extension to supply {@link OutputFormat}'s that carrying out crunch + * specific logic + */ +public class CrunchHCatOutputFormat extends HCatOutputFormat { + + @Override + protected OutputFormat<WritableComparable<?>, HCatRecord> getOutputFormat(JobContext context) throws IOException { + OutputJobInfo jobInfo = getJobInfo(context.getConfiguration()); + HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), + jobInfo.getTableInfo().getStorerInfo()); + // Always configure storage handler with jobproperties/jobconf before + // calling any methods on it + configureOutputStorageHandler(context); + if (storageHandler instanceof FosterStorageHandler) { + return new CrunchFileOutputFormatContainer( + ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), context.getConfiguration())); + } else { + return new CrunchDefaultOutputFormatContainer( + ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), context.getConfiguration())); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtils.java b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtils.java new file mode 100644 index 0000000..4621b1f --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.mapreduce; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.util.List; + +/** + * Common helper methods for translating between v1 and v2 of map reduce + */ +public class HCatMapRedUtils { + + public static org.apache.hadoop.mapred.TaskAttemptContext getOldTaskAttemptContext(TaskAttemptContext context) { + return new TaskAttemptContextImpl(new JobConf(context.getConfiguration()), getTaskAttemptID(context)); + } + + /** + * Creates a {@code TaskAttemptID} from the provided TaskAttemptContext. This + * also performs logic to strip the crunch named output from the TaskAttemptID + * already associated with the TaskAttemptContext. The TaskAttemptID requires + * there to be six parts, separated by "_". With the named output the JobID + * has 7 parts. That needs to be stripped away before a new TaskAttemptID can + * be constructed. + * + * @param context + * The TaskAttemptContext + * @return A TaskAttemptID with the crunch named output removed + */ + public static TaskAttemptID getTaskAttemptID(TaskAttemptContext context) { + String taskAttemptId = context.getTaskAttemptID().toString(); + List<String> taskAttemptIDParts = Lists.newArrayList(taskAttemptId.split("_")); + if (taskAttemptIDParts.size() < 7) + return TaskAttemptID.forName(taskAttemptId); + + // index 2 is the 3rd element in the task attempt id, which will be the + // named output + taskAttemptIDParts.remove(2); + String reducedTaskAttemptId = StringUtils.join(taskAttemptIDParts, "_"); + return TaskAttemptID.forName(reducedTaskAttemptId); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/package-info.java ---------------------------------------------------------------------- diff --git a/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/package-info.java b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/package-info.java new file mode 100644 index 0000000..d3f569c --- /dev/null +++ b/crunch-hcatalog/src/main/java/org/apache/hive/hcatalog/mapreduce/package-info.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The package of classes here is needed to extend the default classes provided + * by Hive. The classes in that package are package private, and therefore could + * not be overridden outside of that package scope. Crunch needs to extend the + * classes to override the behavior of creating + * {@link org.apache.hadoop.mapred.TaskAttemptID}'s. + * + * {@link org.apache.hadoop.mapred.TaskAttemptID#forName(java.lang.String)} is + * used by default in + * {@link org.apache.hive.hcatalog.mapreduce.DefaultOutputCommitterContainer} + * and {@link org.apache.hive.hcatalog.mapreduce.FileOutputFormatContainer} to + * translate between MR v1 and MR v2. This causes issues because a TaskAttemptID + * requires the string representation to be 6 elements, separated by underscores + * ('_'). Crunch adds the named output to the JobID (which is used when creating + * the TaskAttemptID) which gives the TaskAttemptID 7 elements. + * + * e.g. + * + * <pre> + * attempt_1508401628996_out0_16350_m_000000_0 + * </pre> + * + * So, the crunch classes in this package change the logic when creating + * TaskAttemptID's to strip the named output before creating the TaskAttemptID. + * + * e.g + * + * <pre> + * attempt_1508401628996_out0_16350_m_000000_0 -> attempt_1508401628996_16350_m_000000_0 + * </pre> + */ +package org.apache.hive.hcatalog.mapreduce; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/5609b014/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9be4ba7..248aa05 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ under the License. <module>crunch-scrunch</module> <module>crunch-spark</module> <module>crunch-hive</module> + <module>crunch-hcatalog</module> <module>crunch-dist</module> <module>crunch-kafka</module> </modules> @@ -80,7 +81,7 @@ under the License. in NOTICE and LICENSE files up to date. --> <guava.version>14.0.1</guava.version> - <commons-io.version>2.1</commons-io.version> + <commons-io.version>2.4</commons-io.version> <commons-lang.version>2.6</commons-lang.version> <commons-codec.version>1.4</commons-codec.version> <commons-cli.version>1.2</commons-cli.version> @@ -102,6 +103,7 @@ under the License. <hadoop.version>2.6.0</hadoop.version> <hbase.version>1.0.0</hbase.version> <avro.classifier>hadoop2</avro.classifier> + <hive.version>2.1.0</hive.version> <kafka.version>0.10.0.1</kafka.version> <scala.base.version>2.11</scala.base.version> @@ -181,6 +183,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-hcatalog</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> @@ -193,6 +201,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + <version>${hive.version}</version> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version>
