Author: thejas
Date: Mon Mar 3 18:54:38 2014
New Revision: 1573674
URL: http://svn.apache.org/r1573674
Log:
HIVE-5504 : OrcOutputFormat honors compression properties only from within hive
(Sushanth Sowmyan via Thejas Nair)
Added:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java?rev=1573674&r1=1573673&r2=1573674&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
Mon Mar 3 18:54:38 2014
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
-import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -98,7 +97,6 @@ public class FosterStorageHandler extend
@Override
public void configureInputJobProperties(TableDesc tableDesc,
Map<String, String> jobProperties) {
-
}
@Override
@@ -172,10 +170,9 @@ public class FosterStorageHandler extend
jobProperties.put("mapred.output.dir", jobInfo.getLocation());
}
- //TODO find a better home for this, RCFile specifc
- jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR,
- Integer.toOctalString(
- jobInfo.getOutputSchema().getFields().size()));
+
SpecialCases.addSpecialCasesParametersToOutputJobProperties(jobProperties,
jobInfo, ofClass);
+
+
jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO,
HCatUtil.serialize(jobInfo));
} catch (IOException e) {
Added:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java?rev=1573674&view=auto
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java
(added)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java
Mon Mar 3 18:54:38 2014
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is a place to put all the code associated with
+ * Special cases. If there is a corner case required to make
+ * a particular format work that is above and beyond the generic
+ * use, it belongs here, for example. Over time, the goal is to
+ * try to minimize usage of this, but it is a useful overflow
+ * class that allows us to still be as generic as possible
+ * in the main codeflow path, and call attention to the special
+ * cases here.
+ * Note : For all methods introduced here, please document why
+ * the special case is necessary, providing a jira number if
+ * possible.
+ */
+public class SpecialCases {
+
+ static final private Log LOG = LogFactory.getLog(SpecialCases.class);
+
+ // Orc-specific parameter definitions
+ private final static List<String> orcTablePropsToCopy = Arrays.asList(
+ OrcFile.STRIPE_SIZE,
+ OrcFile.COMPRESSION,
+ OrcFile.COMPRESSION_BLOCK_SIZE,
+ OrcFile.ROW_INDEX_STRIDE,
+ OrcFile.ENABLE_INDEXES,
+ OrcFile.BLOCK_PADDING
+ );
+
+ /**
+ * Method to do any file-format specific special casing while
+ * instantiating a storage handler to write. We set any parameters
+ * we want to be visible to the job in jobProperties, and this will
+ * be available to the job via jobconf at run time.
+ * @param jobProperties : map to write to
+ * @param jobInfo : information about this output job to read from
+ * @param ofclass : the output format in use
+ */
+ public static void addSpecialCasesParametersToOutputJobProperties(
+ Map<String, String> jobProperties,
+ OutputJobInfo jobInfo, Class<? extends OutputFormat> ofclass) {
+ if (ofclass == RCFileOutputFormat.class) {
+ // RCFile specific parameter
+ jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR,
+ Integer.toOctalString(
+ jobInfo.getOutputSchema().getFields().size()));
+ } else if (ofclass == OrcOutputFormat.class) {
+ // Special cases for ORC
+ // We need to check table properties to see if a couple of parameters,
+ // such as compression parameters are defined. If they are, then we copy
+ // them to job properties, so that it will be available in jobconf at
runtime
+ // See HIVE-5504 for details
+ Map<String, String> tableProps =
jobInfo.getTableInfo().getTable().getParameters();
+ for (String propName : orcTablePropsToCopy){
+ if (tableProps.containsKey(propName)){
+ jobProperties.put(propName,tableProps.get(propName));
+ }
+ }
+ }
+ }
+
+
+}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1573674&r1=1573673&r2=1573674&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
Mon Mar 3 18:54:38 2014
@@ -106,58 +106,71 @@ public class OrcOutputFormat extends Fil
}
}
- @Override
- public RecordWriter<NullWritable, OrcSerdeRow>
- getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
- Progressable reporter) throws IOException {
- return new
- OrcRecordWriter(new Path(name), OrcFile.writerOptions(conf));
+ /**
+ * Helper method to get a parameter first from props if present, falling
back to JobConf if not.
+ * Returns null if key is present in neither.
+ */
+ private String getSettingFromPropsFallingBackToConf(String key, Properties
props, JobConf conf){
+ if ((props != null) && props.containsKey(key)){
+ return props.getProperty(key);
+ } else if(conf != null) {
+ // If conf is not null, and the key is not present, Configuration.get()
will
+ // return null for us. So, we don't have to check if it contains it.
+ return conf.get(key);
+ } else {
+ return null;
+ }
}
- @Override
- public FSRecordWriter
- getHiveRecordWriter(JobConf conf,
- Path path,
- Class<? extends Writable> valueClass,
- boolean isCompressed,
- Properties tableProperties,
- Progressable reporter) throws IOException {
+ private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) {
OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
- if (tableProperties.containsKey(OrcFile.STRIPE_SIZE)) {
- options.stripeSize(Long.parseLong
- (tableProperties.getProperty(OrcFile.STRIPE_SIZE)));
+ String propVal ;
+ if ((propVal =
getSettingFromPropsFallingBackToConf(OrcFile.STRIPE_SIZE,props,conf)) != null){
+ options.stripeSize(Long.parseLong(propVal));
}
- if (tableProperties.containsKey(OrcFile.COMPRESSION)) {
- options.compress(CompressionKind.valueOf
- (tableProperties.getProperty(OrcFile.COMPRESSION)));
+ if ((propVal =
getSettingFromPropsFallingBackToConf(OrcFile.COMPRESSION,props,conf)) != null){
+ options.compress(CompressionKind.valueOf(propVal));
}
- if (tableProperties.containsKey(OrcFile.COMPRESSION_BLOCK_SIZE)) {
- options.bufferSize(Integer.parseInt
- (tableProperties.getProperty
- (OrcFile.COMPRESSION_BLOCK_SIZE)));
+ if ((propVal =
getSettingFromPropsFallingBackToConf(OrcFile.COMPRESSION_BLOCK_SIZE,props,conf))
!= null){
+ options.bufferSize(Integer.parseInt(propVal));
}
- if (tableProperties.containsKey(OrcFile.ROW_INDEX_STRIDE)) {
- options.rowIndexStride(Integer.parseInt
- (tableProperties.getProperty
- (OrcFile.ROW_INDEX_STRIDE)));
+ if ((propVal =
getSettingFromPropsFallingBackToConf(OrcFile.ROW_INDEX_STRIDE,props,conf)) !=
null){
+ options.rowIndexStride(Integer.parseInt(propVal));
}
- if (tableProperties.containsKey(OrcFile.ENABLE_INDEXES)) {
- if ("false".equals(tableProperties.getProperty
- (OrcFile.ENABLE_INDEXES))) {
+ if ((propVal =
getSettingFromPropsFallingBackToConf(OrcFile.ENABLE_INDEXES,props,conf)) !=
null){
+ if ("false".equalsIgnoreCase(propVal)) {
options.rowIndexStride(0);
}
}
- if (tableProperties.containsKey(OrcFile.BLOCK_PADDING)) {
- options.blockPadding(Boolean.parseBoolean
- (tableProperties.getProperty
- (OrcFile.BLOCK_PADDING)));
+ if ((propVal =
getSettingFromPropsFallingBackToConf(OrcFile.BLOCK_PADDING,props,conf)) !=
null){
+ options.blockPadding(Boolean.parseBoolean(propVal));
}
- return new OrcRecordWriter(path, options);
+ return options;
+ }
+
+ @Override
+ public RecordWriter<NullWritable, OrcSerdeRow>
+ getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
+ Progressable reporter) throws IOException {
+ return new
+ OrcRecordWriter(new Path(name), getOptions(conf,null));
+ }
+
+
+ @Override
+ public FSRecordWriter
+ getHiveRecordWriter(JobConf conf,
+ Path path,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ Progressable reporter) throws IOException {
+ return new OrcRecordWriter(path, getOptions(conf,tableProperties));
}
}