Author: thejas
Date: Mon Mar  3 18:54:38 2014
New Revision: 1573674

URL: http://svn.apache.org/r1573674
Log:
HIVE-5504 : OrcOutputFormat honors compression properties only from within hive 
(Sushanth Sowmyan via Thejas Nair)

Added:
    
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java
Modified:
    
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java

Modified: 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java?rev=1573674&r1=1573673&r2=1573674&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
 Mon Mar  3 18:54:38 2014
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
-import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -98,7 +97,6 @@ public class FosterStorageHandler extend
   @Override
   public void configureInputJobProperties(TableDesc tableDesc,
                       Map<String, String> jobProperties) {
-
   }
 
   @Override
@@ -172,10 +170,9 @@ public class FosterStorageHandler extend
         jobProperties.put("mapred.output.dir", jobInfo.getLocation());
       }
 
-      //TODO find a better home for this, RCFile specifc
-      jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR,
-        Integer.toOctalString(
-          jobInfo.getOutputSchema().getFields().size()));
+      
SpecialCases.addSpecialCasesParametersToOutputJobProperties(jobProperties, 
jobInfo, ofClass);
+
+
       jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO,
         HCatUtil.serialize(jobInfo));
     } catch (IOException e) {

Added: 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java?rev=1573674&view=auto
==============================================================================
--- 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java
 (added)
+++ 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java
 Mon Mar  3 18:54:38 2014
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is a place to put all the code associated with
+ * Special cases. If there is a corner case required to make
+ * a particular format work that is above and beyond the generic
+ * use, it belongs here, for example. Over time, the goal is to
+ * try to minimize usage of this, but it is a useful overflow
+ * class that allows us to still be as generic as possible
+ * in the main codeflow path, and call attention to the special
+ * cases here.
+ * Note : For all methods introduced here, please document why
+ * the special case is necessary, providing a jira number if
+ * possible.
+ */
+public class SpecialCases {
+
+  static final private Log LOG = LogFactory.getLog(SpecialCases.class);
+
+  // Orc-specific parameter definitions
+  private final static List<String> orcTablePropsToCopy = Arrays.asList(
+      OrcFile.STRIPE_SIZE,
+      OrcFile.COMPRESSION,
+      OrcFile.COMPRESSION_BLOCK_SIZE,
+      OrcFile.ROW_INDEX_STRIDE,
+      OrcFile.ENABLE_INDEXES,
+      OrcFile.BLOCK_PADDING
+  );
+
+  /**
+   * Method to do any file-format specific special casing while
+   * instantiating a storage handler to write. We set any parameters
+   * we want to be visible to the job in jobProperties, and this will
+   * be available to the job via jobconf at run time.
+   * @param jobProperties : map to write to
+   * @param jobInfo : information about this output job to read from
+   * @param ofclass : the output format in use
+   */
+  public static void addSpecialCasesParametersToOutputJobProperties(
+      Map<String, String> jobProperties,
+      OutputJobInfo jobInfo, Class<? extends OutputFormat> ofclass) {
+    if (ofclass == RCFileOutputFormat.class) {
+      // RCFile specific parameter
+      jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR,
+          Integer.toOctalString(
+              jobInfo.getOutputSchema().getFields().size()));
+    } else if (ofclass == OrcOutputFormat.class) {
+      // Special cases for ORC
+      // We need to check table properties to see if a couple of parameters,
+      // such as compression parameters are defined. If they are, then we copy
+      // them to job properties, so that it will be available in jobconf at 
runtime
+      // See HIVE-5504 for details
+      Map<String, String> tableProps = 
jobInfo.getTableInfo().getTable().getParameters();
+      for (String propName : orcTablePropsToCopy){
+        if (tableProps.containsKey(propName)){
+          jobProperties.put(propName,tableProps.get(propName));
+        }
+      }
+    }
+  }
+
+
+}

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1573674&r1=1573673&r2=1573674&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java 
Mon Mar  3 18:54:38 2014
@@ -106,58 +106,71 @@ public class OrcOutputFormat extends Fil
     }
   }
 
-  @Override
-  public RecordWriter<NullWritable, OrcSerdeRow>
-      getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
-                      Progressable reporter) throws IOException {
-    return new
-      OrcRecordWriter(new Path(name), OrcFile.writerOptions(conf));
+  /**
+   * Helper method to get a parameter first from props if present, falling 
back to JobConf if not.
+   * Returns null if key is present in neither.
+   */
+  private String getSettingFromPropsFallingBackToConf(String key, Properties 
props, JobConf conf){
+    if ((props != null) && props.containsKey(key)){
+      return props.getProperty(key);
+    } else if(conf != null) {
+      // If conf is not null, and the key is not present, Configuration.get() 
will
+      // return null for us. So, we don't have to check if it contains it.
+      return conf.get(key);
+    } else {
+      return null;
+    }
   }
 
-  @Override
-  public FSRecordWriter
-     getHiveRecordWriter(JobConf conf,
-                         Path path,
-                         Class<? extends Writable> valueClass,
-                         boolean isCompressed,
-                         Properties tableProperties,
-                         Progressable reporter) throws IOException {
+  private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) {
     OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
-    if (tableProperties.containsKey(OrcFile.STRIPE_SIZE)) {
-      options.stripeSize(Long.parseLong
-                           (tableProperties.getProperty(OrcFile.STRIPE_SIZE)));
+    String propVal ;
+    if ((propVal = 
getSettingFromPropsFallingBackToConf(OrcFile.STRIPE_SIZE,props,conf)) != null){
+      options.stripeSize(Long.parseLong(propVal));
     }
 
-    if (tableProperties.containsKey(OrcFile.COMPRESSION)) {
-      options.compress(CompressionKind.valueOf
-                           (tableProperties.getProperty(OrcFile.COMPRESSION)));
+    if ((propVal = 
getSettingFromPropsFallingBackToConf(OrcFile.COMPRESSION,props,conf)) != null){
+      options.compress(CompressionKind.valueOf(propVal));
     }
 
-    if (tableProperties.containsKey(OrcFile.COMPRESSION_BLOCK_SIZE)) {
-      options.bufferSize(Integer.parseInt
-                         (tableProperties.getProperty
-                            (OrcFile.COMPRESSION_BLOCK_SIZE)));
+    if ((propVal = 
getSettingFromPropsFallingBackToConf(OrcFile.COMPRESSION_BLOCK_SIZE,props,conf))
 != null){
+      options.bufferSize(Integer.parseInt(propVal));
     }
 
-    if (tableProperties.containsKey(OrcFile.ROW_INDEX_STRIDE)) {
-      options.rowIndexStride(Integer.parseInt
-                             (tableProperties.getProperty
-                              (OrcFile.ROW_INDEX_STRIDE)));
+    if ((propVal = 
getSettingFromPropsFallingBackToConf(OrcFile.ROW_INDEX_STRIDE,props,conf)) != 
null){
+      options.rowIndexStride(Integer.parseInt(propVal));
     }
 
-    if (tableProperties.containsKey(OrcFile.ENABLE_INDEXES)) {
-      if ("false".equals(tableProperties.getProperty
-                         (OrcFile.ENABLE_INDEXES))) {
+    if ((propVal = 
getSettingFromPropsFallingBackToConf(OrcFile.ENABLE_INDEXES,props,conf)) != 
null){
+      if ("false".equalsIgnoreCase(propVal)) {
         options.rowIndexStride(0);
       }
     }
 
-    if (tableProperties.containsKey(OrcFile.BLOCK_PADDING)) {
-      options.blockPadding(Boolean.parseBoolean
-                           (tableProperties.getProperty
-                            (OrcFile.BLOCK_PADDING)));
+    if ((propVal = 
getSettingFromPropsFallingBackToConf(OrcFile.BLOCK_PADDING,props,conf)) != 
null){
+      options.blockPadding(Boolean.parseBoolean(propVal));
     }
 
-    return new OrcRecordWriter(path, options);
+    return options;
+  }
+
+  @Override
+  public RecordWriter<NullWritable, OrcSerdeRow>
+  getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
+                  Progressable reporter) throws IOException {
+    return new
+        OrcRecordWriter(new Path(name), getOptions(conf,null));
+  }
+
+
+  @Override
+  public FSRecordWriter
+     getHiveRecordWriter(JobConf conf,
+                         Path path,
+                         Class<? extends Writable> valueClass,
+                         boolean isCompressed,
+                         Properties tableProperties,
+                         Progressable reporter) throws IOException {
+    return new OrcRecordWriter(path, getOptions(conf,tableProperties));
   }
 }


Reply via email to