Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Fri Sep 30 19:23:21 2011 @@ -21,7 +21,6 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -30,10 +29,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -42,18 +38,13 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; import org.apache.hadoop.hive.thrift.DelegationTokenSelector; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -65,7 +56,6 @@ import org.apache.hcatalog.common.HCatEx import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.thrift.TException; /** The OutputFormat to use to write data to HCat. The key value is ignored and * and should be given as null. The value is the HCatRecord to write.*/ @@ -73,20 +63,7 @@ public class HCatOutputFormat extends HC // static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class); - /** The directory under which data is initially written for a non partitioned table */ - protected static final String TEMP_DIR_NAME = "_TEMP"; - - /** */ - protected static final String DYNTEMP_DIR_NAME = "_DYN"; - private static Map<String, Token<? extends AbstractDelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<? extends AbstractDelegationTokenIdentifier>>(); - - private static final PathFilter hiddenFileFilter = new PathFilter(){ - public boolean accept(Path p){ - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }; private static int maxDynamicPartitions; private static boolean harRequested; @@ -161,9 +138,6 @@ public class HCatOutputFormat extends HC outputJobInfo.setPartitionValues(valueMap); } - //Handle duplicate publish - handleDuplicatePublish(job, outputJobInfo, client, table); - StorageDescriptor tblSD = table.getSd(); HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD); StorerInfo storerInfo = InitializeInput.extractStorerInfo(tblSD,table.getParameters()); @@ -211,7 +185,7 @@ public class HCatOutputFormat extends HC // new Text() do new Text("oozie") below - if this change is made also // remember to do: // job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie"); - // Also change code in HCatOutputCommitter.cleanupJob() to cancel the + // Also change code in OutputCommitter.cleanupJob() to cancel the // token only if token.service is not "oozie" - remove the condition of // HCAT_KEY_TOKEN_SIGNATURE != null in that code. Token<? extends TokenIdentifier> token = tokenSelector.selectToken( @@ -238,8 +212,8 @@ public class HCatOutputFormat extends HC String tokenSignature = getTokenSignature(outputJobInfo); if(tokenMap.get(tokenSignature) == null) { // get delegation tokens from hcat server and store them into the "job" - // These will be used in the HCatOutputCommitter to publish partitions to - // hcat + // These will be used in to publish partitions to + // hcat normally in OutputCommitter.commitJob() // when the JobTracker in Hadoop MapReduce starts supporting renewal of // arbitrary tokens, the renewer should be the principal of the JobTracker tokenMap.put(tokenSignature, HCatUtil.extractThriftToken( @@ -312,61 +286,6 @@ public class HCatOutputFormat extends HC } /** - * Handles duplicate publish of partition. Fails if partition already exists. - * For non partitioned tables, fails if files are present in table directory. - * For dynamic partitioned publish, does nothing - check would need to be done at recordwriter time - * @param job the job - * @param outputInfo the output info - * @param client the metastore client - * @param table the table being written to - * @throws IOException - * @throws MetaException - * @throws TException - */ - private static void handleDuplicatePublish(Job job, OutputJobInfo outputInfo, - HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException { - - /* - * For fully specified ptn, follow strict checks for existence of partitions in metadata - * For unpartitioned tables, follow filechecks - * For partially specified tables: - * This would then need filechecks at the start of a ptn write, - * Doing metadata checks can get potentially very expensive (fat conf) if - * there are a large number of partitions that match the partial specifications - */ - - if( table.getPartitionKeys().size() > 0 ) { - if (!outputInfo.isDynamicPartitioningUsed()){ - List<String> partitionValues = HCatOutputCommitter.getPartitionValueList( - table, outputInfo.getPartitionValues()); - // fully-specified partition - List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(), - outputInfo.getTableName(), partitionValues, (short) 1); - - if( currentParts.size() > 0 ) { - throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION); - } - } - } else { - List<String> partitionValues = HCatOutputCommitter.getPartitionValueList( - table, outputInfo.getPartitionValues()); - // non-partitioned table - - Path tablePath = new Path(table.getSd().getLocation()); - FileSystem fs = tablePath.getFileSystem(job.getConfiguration()); - - if ( fs.exists(tablePath) ) { - FileStatus[] status = fs.globStatus(new Path(tablePath, "*"), hiddenFileFilter); - - if( status.length > 0 ) { - throw new HCatException(ErrorType.ERROR_NON_EMPTY_TABLE, - table.getDbName() + "." + table.getTableName()); - } - } - } - } - - /** * Set the schema for the data being written out to the partition. The * table schema is used by default for the partition if this is not called. * @param job the job object @@ -391,10 +310,7 @@ public class HCatOutputFormat extends HC public RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException { - - HCatRecordWriter rw = new HCatRecordWriter(context); - rw.prepareForStorageDriverOutput(context); - return rw; + return getOutputFormat(context).getRecordWriter(context); } @@ -409,8 +325,7 @@ public class HCatOutputFormat extends HC @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context ) throws IOException, InterruptedException { - OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context); - return new HCatOutputCommitter(context,outputFormat.getOutputCommitter(context)); + return getOutputFormat(context).getOutputCommitter(context); } static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException { @@ -420,7 +335,7 @@ public class HCatOutputFormat extends HC } - private static HiveConf getHiveConf(String url, Configuration conf) throws IOException { + static HiveConf getHiveConf(String url, Configuration conf) throws IOException { HiveConf hiveConf = new HiveConf(HCatOutputFormat.class); if( url != null ) { @@ -477,37 +392,4 @@ public class HCatOutputFormat extends HC return hiveConf; } - /** - * Any initialization of file paths, set permissions and group on freshly created files - * This is called at RecordWriter instantiation time which can be at write-time for - * a dynamic partitioning usecase - * @param context - * @throws IOException - */ - public static void prepareOutputLocation(HCatOutputStorageDriver osd, TaskAttemptContext context) throws IOException { - OutputJobInfo info = HCatBaseOutputFormat.getJobInfo(context); -// Path workFile = osd.getWorkFilePath(context,info.getLocation()); - Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir")); - Path tblPath = new Path(info.getTableInfo().getTable().getSd().getLocation()); - FileSystem fs = tblPath.getFileSystem(context.getConfiguration()); - FileStatus tblPathStat = fs.getFileStatus(tblPath); - -// LOG.info("Attempting to set permission ["+tblPathStat.getPermission()+"] on ["+ -// workFile+"], location=["+info.getLocation()+"] , mapred.locn =["+ -// context.getConfiguration().get("mapred.output.dir")+"]"); -// -// FileStatus wFileStatus = fs.getFileStatus(workFile); -// LOG.info("Table : "+tblPathStat.getPath()); -// LOG.info("Working File : "+wFileStatus.getPath()); - - fs.setPermission(workFile, tblPathStat.getPermission()); - try{ - fs.setOwner(workFile, null, tblPathStat.getGroup()); - } catch(AccessControlException ace){ - // log the messages before ignoring. Currently, logging is not built in HCat. - } - } - - - }
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java Fri Sep 30 19:23:21 2011 @@ -17,28 +17,17 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.security.AccessControlException; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; @@ -118,35 +107,13 @@ public abstract class HCatOutputStorageD */ public String getOutputLocation(JobContext jobContext, String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException { - - String parentPath = tableLocation; - // For dynamic partitioned writes without all keyvalues specified, - // we create a temp dir for the associated write job - if (dynHash != null){ - parentPath = new Path(tableLocation, HCatOutputFormat.DYNTEMP_DIR_NAME+dynHash).toString(); - } - - // For non-partitioned tables, we send them to the temp dir - if((dynHash == null) && ( partitionValues == null || partitionValues.size() == 0 )) { - return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString(); - } - - List<String> values = new ArrayList<String>(); - for(String partitionCol : partitionCols) { - values.add(partitionValues.get(partitionCol)); - } - - String partitionLocation = FileUtils.makePartName(partitionCols, values); - - Path path = new Path(parentPath, partitionLocation); - return path.toString(); + return null; } - /** Default implementation assumes FileOutputFormat. Storage drivers wrapping - * other OutputFormats should override this method. + /** Storage drivers wrapping other OutputFormats should override this method. */ public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{ - return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), FileOutputFormat.getUniqueFile(context, "part","")); + return null; } /** @@ -202,5 +169,13 @@ public abstract class HCatOutputStorageD getOutputFormat().getOutputCommitter(context).abortJob(context,state); } - + /** + * return an instance of OutputFormatContainer containing the passed outputFormat. DefaultOutputFormatContainer is returned by default. + * @param outputFormat format the returned container will contain + * @return + */ + OutputFormatContainer getOutputFormatContainer(OutputFormat outputFormat) { + return new DefaultOutputFormatContainer(outputFormat); + } + } Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java?rev=1177788&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java Fri Sep 30 19:23:21 2011 @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; + +/** + * This class will contain an implementation of an OutputCommitter. + * See {@link OutputFormatContainer} for more information about containers. + */ +abstract class OutputCommitterContainer extends OutputCommitter { + private final OutputCommitter committer; + + /** + * @param context current JobContext + * @param committer OutputCommitter that this instance will contain + */ + public OutputCommitterContainer(JobContext context, OutputCommitter committer) { + this.committer = committer; + } + + /** + * @return underlying OutputCommitter + */ + public OutputCommitter getBaseOutputCommitter() { + return committer; + } + +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java?rev=1177788&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java Fri Sep 30 19:23:21 2011 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hcatalog.data.HCatRecord; + +/** + * This container class is used to wrap OutputFormat implementations and augment them with + * behavior necessary to work with HCatalog (ie metastore updates, hcatalog delegation tokens, etc). + * Containers are also used to provide storage specific implementations of some HCatalog features (ie dynamic partitioning). + * Hence users wishing to create storage specific implementations of HCatalog features should implement this class and override + * HCatOutputStorageDriver.getOutputFormatContainer() to return the implementation. + * By default DefaultOutputFormatContainer is used, which only implements the bare minimum features HCatalog features + * such as partitioning isn't supported. + */ +abstract class OutputFormatContainer extends OutputFormat<WritableComparable<?>, HCatRecord> { + private OutputFormat<? super WritableComparable<?>, ? super Writable> of; + + /** + * @param of OutputFormat this instance will contain + */ + public OutputFormatContainer(OutputFormat<? super WritableComparable<?>,? super Writable> of) { + this.of = of; + } + + /** + * @return underlying OutputFormat + */ + public OutputFormat getBaseOutputFormat() { + return of; + } + +} Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java?rev=1177788&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java Fri Sep 30 19:23:21 2011 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.data.HCatRecord; + +/** + * This class will contain an implementation of an RecordWriter. + * See {@link OutputFormatContainer} for more information about containers. + */ +abstract class RecordWriterContainer extends RecordWriter<WritableComparable<?>, HCatRecord> { + + private final RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter; + + /** + * @param context current JobContext + * @param baseRecordWriter RecordWriter that this instance will contain + */ + public RecordWriterContainer(TaskAttemptContext context, + RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) { + this.baseRecordWriter = baseRecordWriter; + } + + /** + * @return underlying RecordWriter + */ + public RecordWriter getBaseRecordWriter() { + return baseRecordWriter; + } + +} Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java Fri Sep 30 19:23:21 2011 @@ -142,7 +142,7 @@ public class HCatEximStorer extends HCat @Override public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException { if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { - //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob. + //In local mode, mapreduce will not call OutputCommitter.cleanupJob. //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. new HCatEximOutputCommitter(job,null).cleanupJob(job); Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Sep 30 19:23:21 2011 @@ -23,13 +23,16 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.mapreduce.HCatOutputCommitter; import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; @@ -141,10 +144,16 @@ public class HCatStorer extends HCatBase @Override public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException { if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { - //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob. + try { + //In local mode, mapreduce will not call OutputCommitter.cleanupJob. //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. - new HCatOutputCommitter(job,null).cleanupJob(job); + getOutputFormat().getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).cleanupJob(job); + } catch (IOException e) { + throw new IOException("Failed to cleanup job",e); + } catch (InterruptedException e) { + throw new IOException("Failed to cleanup job",e); + } } } } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java Fri Sep 30 19:23:21 2011 @@ -45,17 +45,16 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; +import org.apache.hcatalog.mapreduce.FileOutputStorageDriver; /** * The storage driver for writing RCFile data through HCatOutputFormat. */ - public class RCFileOutputDriver extends HCatOutputStorageDriver { + public class RCFileOutputDriver extends FileOutputStorageDriver { /** The serde for serializing the HCatRecord to bytes writable */ private SerDe serde; Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Fri Sep 30 19:23:21 2011 @@ -58,10 +58,6 @@ import org.apache.hcatalog.data.DefaultH import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hcatalog.mapreduce.HCatOutputCommitter; -import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.mapreduce.HCatTableInfo; import org.apache.hcatalog.rcfile.RCFileInputDriver; import org.apache.hcatalog.rcfile.RCFileOutputDriver; @@ -273,9 +269,8 @@ public abstract class HCatMapReduceTest HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns)); - //new HCatOutputCommitter(null).setupJob(job); job.waitForCompletion(true); - new HCatOutputCommitter(job,null).cleanupJob(job); + new FileOutputCommitterContainer(job,null).cleanupJob(job); if (assertWrite){ // we assert only if we expected to assert with this call. Assert.assertEquals(writeCount, MapCreate.writeCount); Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Sep 30 19:23:21 2011 @@ -134,7 +134,7 @@ public class TestHCatDynamicPartitioned public void testHCatDynamicPartitionMaxPartitions() throws Exception { HiveConf hc = new HiveConf(this.getClass()); - + int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); System.out.println("Max partitions allowed = " + maxParts); Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Sep 30 19:23:21 2011 @@ -41,12 +41,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.util.StringUtils; import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.mapreduce.HCatOutputCommitter; -import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.mapreduce.HCatTableInfo; -import org.apache.hcatalog.mapreduce.InitializeInput; -import org.apache.hcatalog.mapreduce.OutputJobInfo; -import org.apache.hcatalog.mapreduce.StorerInfo; import org.apache.hcatalog.rcfile.RCFileOutputDriver; public class TestHCatOutputFormat extends TestCase { @@ -159,7 +153,7 @@ public class TestHCatOutputFormat extend } public void publishTest(Job job) throws Exception { - OutputCommitter committer = new HCatOutputCommitter(job,null); + OutputCommitter committer = new FileOutputCommitterContainer(job,null); committer.cleanupJob(job); Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1")); Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java?rev=1177788&r1=1177787&r2=1177788&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java Fri Sep 30 19:23:21 2011 @@ -30,12 +30,15 @@ import org.apache.hadoop.hive.serde2.col import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; +import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.apache.hcatalog.rcfile.RCFileInputDriver; import org.apache.hcatalog.rcfile.RCFileOutputDriver; @@ -44,6 +47,8 @@ public class TestRCFileOutputStorageDriv public void testConversion() throws IOException { Configuration conf = new Configuration(); JobContext jc = new JobContext(conf, new JobID()); + String jobString = HCatUtil.serialize(OutputJobInfo.create(null,null,null,null,null)); + jc.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,jobString); HCatSchema schema = buildHiveSchema(); HCatInputStorageDriver isd = new RCFileInputDriver();
