Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Nov 18 00:48:40 2014 @@ -1135,7 +1135,9 @@ public class FileSinkOperator extends Te String postfix=null; if (taskIndependent) { // key = "database.table/SP/DP/"LB/ - prefix = conf.getTableInfo().getTableName(); + // Hive store lowercase table name in metastore, and Counters is character case sensitive, so we + // use lowercase table name as prefix here, as StatsTask get table name from metastore to fetch counter. + prefix = conf.getTableInfo().getTableName().toLowerCase(); } else { // key = "prefix/SP/DP/"LB/taskID/ prefix = conf.getStatsAggPrefix();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java Tue Nov 18 00:48:40 2014 @@ -33,6 +33,6 @@ public interface HashTableLoader { void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp); - void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) - throws HiveException; + void load(MapJoinTableContainer[] mapJoinTables, + MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Tue Nov 18 00:48:40 2014 @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; @@ -187,7 +188,9 @@ public class MapJoinOperator extends Abs } perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(getExecContext(), hconf, this); - loader.load(mapJoinTables, mapJoinTableSerdes); + long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize() + * conf.getHashTableMemoryUsage()); + loader.load(mapJoinTables, mapJoinTableSerdes, memUsage); if (!conf.isBucketMapJoin()) { /* * The issue with caching in case of bucket map join is that different tasks Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.exec; import java.io.IOException; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Tue Nov 18 00:48:40 2014 @@ -55,22 +55,30 @@ public class MapJoinMemoryExhaustionHand this.console = console; this.maxMemoryUsage = maxMemoryUsage; this.memoryMXBean = ManagementFactory.getMemoryMXBean(); - long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax(); + this.maxHeapSize = getMaxHeapSize(memoryMXBean); + percentageNumberFormat = NumberFormat.getInstance(); + percentageNumberFormat.setMinimumFractionDigits(2); + LOG.info("JVM Max Heap Size: " + this.maxHeapSize); + } + + public static long getMaxHeapSize() { + return getMaxHeapSize(ManagementFactory.getMemoryMXBean()); + } + + private static long getMaxHeapSize(MemoryMXBean bean) { + long maxHeapSize = bean.getHeapMemoryUsage().getMax(); /* * According to the javadoc, getMax() can return -1. In this case * default to 200MB. This will probably never actually happen. */ if(maxHeapSize == -1) { - this.maxHeapSize = 200L * 1024L * 1024L; LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " + "defaulting maxHeapSize to 200MB"); - } else { - this.maxHeapSize = maxHeapSize; + return 200L * 1024L * 1024L; } - percentageNumberFormat = NumberFormat.getInstance(); - percentageNumberFormat.setMinimumFractionDigits(2); - LOG.info("JVM Max Heap Size: " + this.maxHeapSize); + return maxHeapSize; } + /** * Throws MapJoinMemoryExhaustionException when the JVM has consumed the * configured percentage of memory. The arguments are used simply for the error Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Tue Nov 18 00:48:40 2014 @@ -72,7 +72,7 @@ public class HashTableLoader implements @Override public void load( MapJoinTableContainer[] mapJoinTables, - MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { + MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException { String currentInputPath = context.getCurrentInputPath().toString(); LOG.info("******* Load from HashTable for input file: " + currentInputPath); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Tue Nov 18 00:48:40 2014 @@ -149,13 +149,27 @@ public final class BytesBytesMultiHashMa /** We have 39 bits to store list pointer from the first record; this is size limit */ final static long MAX_WB_SIZE = ((long)1) << 38; + /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of + * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */ + private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; - public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) { + public BytesBytesMultiHashMap(int initialCapacity, + float loadFactor, int wbSize, long memUsage, int defaultCapacity) { if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } + assert initialCapacity > 0; initialCapacity = (Long.bitCount(initialCapacity) == 1) ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); + // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check. + int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY + : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8); + if (maxCapacity < initialCapacity || initialCapacity <= 0) { + // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows + initialCapacity = (Long.bitCount(maxCapacity) == 1) + ? maxCapacity : nextLowestPowerOfTwo(maxCapacity); + } + validateCapacity(initialCapacity); startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity); this.loadFactor = loadFactor; @@ -164,6 +178,11 @@ public final class BytesBytesMultiHashMa resizeThreshold = (int)(initialCapacity * this.loadFactor); } + @VisibleForTesting + BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) { + this(initialCapacity, loadFactor, wbSize, -1, 100000); + } + /** The source of keys and values to put into hashtable; avoids byte copying. */ public static interface KvSource { /** Write key into output. */ @@ -644,6 +663,10 @@ public final class BytesBytesMultiHashMa return Integer.highestOneBit(v) << 1; } + private static int nextLowestPowerOfTwo(int v) { + return Integer.highestOneBit(v); + } + @VisibleForTesting int getCapacity() { return refs.length; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.exec.persistence; @@ -60,17 +78,20 @@ public class MapJoinBytesTableContainer private final List<Object> EMPTY_LIST = new ArrayList<Object>(0); public MapJoinBytesTableContainer(Configuration hconf, - MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException { + MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage) throws SerDeException { this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), valCtx, keyCount); + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), + valCtx, keyCount, memUsage); } private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadFactor, - int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException { - threshold = HashMapWrapper.calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount); - hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize); + int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage) + throws SerDeException { + int newThreshold = HashMapWrapper.calculateTableSize( + keyCountAdj, threshold, loadFactor, keyCount); + hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold); } private LazyBinaryStructObjectInspector createInternalOi( Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Tue Nov 18 00:48:40 2014 @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; @@ -69,7 +70,7 @@ public class HashTableLoader implements @Override public void load( MapJoinTableContainer[] mapJoinTables, - MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { + MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException { TezContext tezContext = (TezContext) MapredContext.get(); Map<Integer, String> parentToInput = desc.getParentToInput(); @@ -106,7 +107,7 @@ public class HashTableLoader implements Long keyCountObj = parentKeyCounts.get(pos); long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue(); MapJoinTableContainer tableContainer = useOptimizedTables - ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount) + ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage) : new HashMapWrapper(hconf, keyCount); while (kvReader.next()) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Tue Nov 18 00:48:40 2014 @@ -45,6 +45,8 @@ import org.apache.tez.dag.api.client.Sta import org.apache.tez.dag.api.client.VertexStatus; import org.fusesource.jansi.Ansi; +import com.google.common.base.Preconditions; + import java.io.IOException; import java.io.PrintStream; import java.text.DecimalFormat; @@ -132,6 +134,11 @@ public class TezJobMonitor { }); } + public static void initShutdownHook() { + Preconditions.checkNotNull(shutdownList, + "Shutdown hook was not properly initialized"); + } + public TezJobMonitor() { console = SessionState.getConsole(); secondsFormat = new DecimalFormat("#0.00"); @@ -290,6 +297,7 @@ public class TezJobMonitor { break; case INITING: console.printInfo("Status: Initializing"); + startTime = System.currentTimeMillis(); break; case RUNNING: if (!running) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Tue Nov 18 00:48:40 2014 @@ -187,6 +187,7 @@ public class TezSessionState { LOG.info("Opening new Tez Session (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); + TezJobMonitor.initShutdownHook(); session.start(); if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue Nov 18 00:48:40 2014 @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.Context; @@ -170,7 +171,8 @@ public class TezTask extends Task<TezWor counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); TezSessionPoolManager.getInstance().returnSession(session); - if (LOG.isInfoEnabled() && counters != null) { + if (LOG.isInfoEnabled() && counters != null + && conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY)) { for (CounterGroup group: counters) { LOG.info(group.getDisplayName() +":"); for (TezCounter counter: group) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java Tue Nov 18 00:48:40 2014 @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.Task; @@ -89,6 +90,8 @@ public class ATSHook implements ExecuteW @Override public void run(final HookContext hookContext) throws Exception { final long currentTime = System.currentTimeMillis(); + final HiveConf conf = new HiveConf(hookContext.getConf()); + executor.submit(new Runnable() { @Override public void run() { @@ -110,19 +113,19 @@ public class ATSHook implements ExecuteW switch(hookContext.getHookType()) { case PRE_EXEC_HOOK: ExplainTask explain = new ExplainTask(); - explain.initialize(hookContext.getConf(), plan, null); + explain.initialize(conf, plan, null); String query = plan.getQueryStr(); List<Task<?>> rootTasks = plan.getRootTasks(); JSONObject explainPlan = explain.getJSONPlan(null, null, rootTasks, plan.getFetchTask(), true, false, false); - fireAndForget(hookContext.getConf(), createPreHookEvent(queryId, query, + fireAndForget(conf, createPreHookEvent(queryId, query, explainPlan, queryStartTime, user, numMrJobs, numTezJobs)); break; case POST_EXEC_HOOK: - fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, true)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, true)); break; case ON_FAILURE_HOOK: - fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, false)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, false)); break; default: //ignore Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java Tue Nov 18 00:48:40 2014 @@ -394,7 +394,8 @@ class ColumnStatisticsImpl implements Co } else if (str.minimum != null) { if (minimum.compareTo(str.minimum) > 0) { minimum = new Text(str.getMinimum()); - } else if (maximum.compareTo(str.maximum) < 0) { + } + if (maximum.compareTo(str.maximum) < 0) { maximum = new Text(str.getMaximum()); } } @@ -563,7 +564,8 @@ class ColumnStatisticsImpl implements Co } else if (dec.minimum != null) { if (minimum.compareTo(dec.minimum) > 0) { minimum = dec.minimum; - } else if (maximum.compareTo(dec.maximum) < 0) { + } + if (maximum.compareTo(dec.maximum) < 0) { maximum = dec.maximum; } if (sum == null || dec.sum == null) { @@ -671,7 +673,8 @@ class ColumnStatisticsImpl implements Co } else if (dateStats.minimum != null) { if (minimum > dateStats.minimum) { minimum = dateStats.minimum; - } else if (maximum < dateStats.maximum) { + } + if (maximum < dateStats.maximum) { maximum = dateStats.maximum; } } @@ -767,7 +770,8 @@ class ColumnStatisticsImpl implements Co } else if (timestampStats.minimum != null) { if (minimum > timestampStats.minimum) { minimum = timestampStats.minimum; - } else if (maximum < timestampStats.maximum) { + } + if (maximum < timestampStats.maximum) { maximum = timestampStats.maximum; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Tue Nov 18 00:48:40 2014 @@ -65,6 +65,8 @@ public final class FileDump { System.out.println("Structure for " + filename); Path path = new Path(filename); Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + System.out.println("File Version: " + reader.getFileVersion().getName() + + " with " + reader.getWriterVersion()); RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); System.out.println("Rows: " + reader.getNumberOfRows()); System.out.println("Compression: " + reader.getCompression()); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Tue Nov 18 00:48:40 2014 @@ -97,6 +97,26 @@ public final class OrcFile { } } + /** + * Records the version of the writer in terms of which bugs have been fixed. + * For bugs in the writer, but the old readers already read the new data + * correctly, bump this version instead of the Version. + */ + public static enum WriterVersion { + ORIGINAL(0), + HIVE_8732(1); // corrupted stripe/file maximum column statistics + + private final int id; + + public int getId() { + return id; + } + + private WriterVersion(int id) { + this.id = id; + } + } + public static enum EncodingStrategy { SPEED, COMPRESSION; } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Nov 18 00:48:40 2014 @@ -630,6 +630,7 @@ public class OrcInputFormat implements private final boolean isOriginal; private final List<Long> deltas; private final boolean hasBase; + private OrcFile.WriterVersion writerVersion; SplitGenerator(Context context, FileSystem fs, FileStatus file, FileInfo fileInfo, @@ -775,7 +776,9 @@ public class OrcInputFormat implements Reader.Options options = new Reader.Options(); setIncludedColumns(options, types, context.conf, isOriginal); setSearchArgument(options, types, context.conf, isOriginal); - if (options.getSearchArgument() != null) { + // only do split pruning if HIVE-8732 has been fixed in the writer + if (options.getSearchArgument() != null && + writerVersion != OrcFile.WriterVersion.ORIGINAL) { SearchArgument sarg = options.getSearchArgument(); List<PredicateLeaf> sargLeaves = sarg.getLeaves(); List<StripeStatistics> stripeStats = metadata.getStripeStatistics(); @@ -866,6 +869,7 @@ public class OrcInputFormat implements fileMetaInfo = fileInfo.fileMetaInfo; metadata = fileInfo.metadata; types = fileInfo.types; + writerVersion = fileInfo.writerVersion; // For multiple runs, in case sendSplitsInFooter changes if (fileMetaInfo == null && context.footerInSplits) { orcReader = OrcFile.createReader(file.getPath(), @@ -873,6 +877,7 @@ public class OrcInputFormat implements fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo(); fileInfo.metadata = orcReader.getMetadata(); fileInfo.types = orcReader.getTypes(); + fileInfo.writerVersion = orcReader.getWriterVersion(); } } else { orcReader = OrcFile.createReader(file.getPath(), @@ -880,13 +885,14 @@ public class OrcInputFormat implements stripes = orcReader.getStripes(); metadata = orcReader.getMetadata(); types = orcReader.getTypes(); + writerVersion = orcReader.getWriterVersion(); fileMetaInfo = context.footerInSplits ? ((ReaderImpl) orcReader).getFileMetaInfo() : null; if (context.cacheStripeDetails) { // Populate into cache. Context.footerCache.put(file.getPath(), new FileInfo(file.getModificationTime(), file.getLen(), stripes, - metadata, types, fileMetaInfo)); + metadata, types, fileMetaInfo, writerVersion)); } } } catch (Throwable th) { @@ -981,18 +987,21 @@ public class OrcInputFormat implements ReaderImpl.FileMetaInfo fileMetaInfo; Metadata metadata; List<OrcProto.Type> types; + private OrcFile.WriterVersion writerVersion; FileInfo(long modificationTime, long size, List<StripeInformation> stripeInfos, Metadata metadata, List<OrcProto.Type> types, - ReaderImpl.FileMetaInfo fileMetaInfo) { + ReaderImpl.FileMetaInfo fileMetaInfo, + OrcFile.WriterVersion writerVersion) { this.modificationTime = modificationTime; this.size = size; this.stripeInfos = stripeInfos; this.fileMetaInfo = fileMetaInfo; this.metadata = metadata; this.types = types; + this.writerVersion = writerVersion; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java Tue Nov 18 00:48:40 2014 @@ -38,6 +38,7 @@ public class OrcNewSplit extends FileSpl private boolean isOriginal; private boolean hasBase; private final List<Long> deltas = new ArrayList<Long>(); + private OrcFile.WriterVersion writerVersion; protected OrcNewSplit(){ //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it. @@ -83,6 +84,7 @@ public class OrcNewSplit extends FileSpl WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position()); out.write(footerBuff.array(), footerBuff.position(), footerBuff.limit() - footerBuff.position()); + WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId()); } } @@ -111,9 +113,11 @@ public class OrcNewSplit extends FileSpl int footerBuffSize = WritableUtils.readVInt(in); ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize); in.readFully(footerBuff.array(), 0, footerBuffSize); + OrcFile.WriterVersion writerVersion = + ReaderImpl.getWriterVersion(WritableUtils.readVInt(in)); fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize, - metadataSize, footerBuff); + metadataSize, footerBuff, writerVersion); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Tue Nov 18 00:48:40 2014 @@ -42,6 +42,7 @@ public class OrcSplit extends FileSplit private boolean isOriginal; private boolean hasBase; private final List<Long> deltas = new ArrayList<Long>(); + private OrcFile.WriterVersion writerVersion; static final int BASE_FLAG = 4; static final int ORIGINAL_FLAG = 2; @@ -92,6 +93,7 @@ public class OrcSplit extends FileSplit WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position()); out.write(footerBuff.array(), footerBuff.position(), footerBuff.limit() - footerBuff.position()); + WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId()); } } @@ -120,9 +122,11 @@ public class OrcSplit extends FileSplit int footerBuffSize = WritableUtils.readVInt(in); ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize); in.readFully(footerBuff.array(), 0, footerBuffSize); + OrcFile.WriterVersion writerVersion = + ReaderImpl.getWriterVersion(WritableUtils.readVInt(in)); fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize, - metadataSize, footerBuff); + metadataSize, footerBuff, writerVersion); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Tue Nov 18 00:48:40 2014 @@ -129,6 +129,16 @@ public interface Reader { List<OrcProto.Type> getTypes(); /** + * Get the file format version. + */ + OrcFile.Version getFileVersion(); + + /** + * Get the version of the writer of this file. + */ + OrcFile.WriterVersion getWriterVersion(); + + /** * Options for creating a RecordReader. */ public static class Options { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Nov 18 00:48:40 2014 @@ -62,6 +62,7 @@ final class ReaderImpl implements Reader private long deserializedSize = -1; private final Configuration conf; private final List<Integer> versionList; + private final OrcFile.WriterVersion writerVersion; //serialized footer - Keeping this around for use by getFileMetaInfo() // will help avoid cpu cycles spend in deserializing at cost of increased @@ -182,6 +183,22 @@ final class ReaderImpl implements Reader } @Override + public OrcFile.Version getFileVersion() { + for (OrcFile.Version version: OrcFile.Version.values()) { + if (version.getMajor() == versionList.get(0) && + version.getMinor() == versionList.get(1)) { + return version; + } + } + return OrcFile.Version.V_0_11; + } + + @Override + public OrcFile.WriterVersion getWriterVersion() { + return writerVersion; + } + + @Override public int getRowIndexStride() { return footer.getRowIndexStride(); } @@ -309,8 +326,22 @@ final class ReaderImpl implements Reader this.footer = rInfo.footer; this.inspector = rInfo.inspector; this.versionList = footerMetaData.versionList; + this.writerVersion = footerMetaData.writerVersion; } + /** + * Get the WriterVersion based on the ORC file postscript. + * @param writerVersion the integer writer version + * @return + */ + static OrcFile.WriterVersion getWriterVersion(int writerVersion) { + for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) { + if (version.getId() == writerVersion) { + return version; + } + } + return OrcFile.WriterVersion.ORIGINAL; + } private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, Path path, @@ -346,6 +377,12 @@ final class ReaderImpl implements Reader int footerSize = (int) ps.getFooterLength(); int metadataSize = (int) ps.getMetadataLength(); + OrcFile.WriterVersion writerVersion; + if (ps.hasWriterVersion()) { + writerVersion = getWriterVersion(ps.getWriterVersion()); + } else { + writerVersion = OrcFile.WriterVersion.ORIGINAL; + } //check compression codec switch (ps.getCompression()) { @@ -391,7 +428,8 @@ final class ReaderImpl implements Reader (int) ps.getCompressionBlockSize(), (int) ps.getMetadataLength(), buffer, - ps.getVersionList() + ps.getVersionList(), + writerVersion ); } @@ -451,25 +489,29 @@ final class ReaderImpl implements Reader final int metadataSize; final ByteBuffer footerBuffer; final List<Integer> versionList; + final OrcFile.WriterVersion writerVersion; FileMetaInfo(String compressionType, int bufferSize, int metadataSize, - ByteBuffer footerBuffer) { - this(compressionType, bufferSize, metadataSize, footerBuffer, null); + ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) { + this(compressionType, bufferSize, metadataSize, footerBuffer, null, + writerVersion); } FileMetaInfo(String compressionType, int bufferSize, int metadataSize, - ByteBuffer footerBuffer, List<Integer> versionList){ + ByteBuffer footerBuffer, List<Integer> versionList, + OrcFile.WriterVersion writerVersion){ this.compressionType = compressionType; this.bufferSize = bufferSize; this.metadataSize = metadataSize; this.footerBuffer = footerBuffer; this.versionList = versionList; + this.writerVersion = writerVersion; } } public FileMetaInfo getFileMetaInfo(){ return new FileMetaInfo(compressionKind.toString(), bufferSize, - metadataSize, footerByteBuffer, versionList); + metadataSize, footerByteBuffer, versionList, writerVersion); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Nov 18 00:48:40 2014 @@ -2364,20 +2364,21 @@ class RecordReaderImpl implements Record PredicateLeaf predicate) { ColumnStatistics cs = ColumnStatisticsImpl.deserialize(index); Object minValue = getMin(cs); + Object maxValue = getMax(cs); + return evaluatePredicateRange(predicate, minValue, maxValue); + } + + static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, + Object max) { // if we didn't have any values, everything must have been null - if (minValue == null) { + if (min == null) { if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) { return TruthValue.YES; } else { return TruthValue.NULL; } } - Object maxValue = getMax(cs); - return evaluatePredicateRange(predicate, minValue, maxValue); - } - static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, - Object max) { Location loc; try { // Predicate object and stats object can be one of the following base types Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TimestampColumnStatistics.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.io.orc; import java.sql.Timestamp; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Nov 18 00:48:40 2014 @@ -2230,7 +2230,8 @@ class WriterImpl implements Writer, Memo .setMetadataLength(metadataLength) .setMagic(OrcFile.MAGIC) .addVersion(version.getMajor()) - .addVersion(version.getMinor()); + .addVersion(version.getMinor()) + .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId()); if (compress != CompressionKind.NONE) { builder.setCompressionBlockSize(bufferSize); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Tue Nov 18 00:48:40 2014 @@ -63,6 +63,8 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.ParquetWriter; import parquet.io.api.Binary; /** @@ -70,13 +72,18 @@ import parquet.io.api.Binary; * A ParquetHiveSerDe for Hive (with the deprecated package mapred) * */ -@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES}) +@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES, + ParquetOutputFormat.COMPRESSION}) public class ParquetHiveSerDe extends AbstractSerDe { public static final Text MAP_KEY = new Text("key"); public static final Text MAP_VALUE = new Text("value"); public static final Text MAP = new Text("map"); public static final Text ARRAY = new Text("bag"); + // default compression type for parquet output format + private static final String DEFAULTCOMPRESSION = + ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME.name(); + // Map precision to the number bytes needed for binary conversion. public static final int PRECISION_TO_BYTE_COUNT[] = new int[38]; static { @@ -99,6 +106,7 @@ public class ParquetHiveSerDe extends Ab private LAST_OPERATION status; private long serializedSize; private long deserializedSize; + private String compressionType; @Override public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException { @@ -110,6 +118,9 @@ public class ParquetHiveSerDe extends Ab final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); + // Get compression properties + compressionType = tbl.getProperty(ParquetOutputFormat.COMPRESSION, DEFAULTCOMPRESSION); + if (columnNameProperty.length() == 0) { columnNames = new ArrayList<String>(); } else { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Tue Nov 18 00:48:40 2014 @@ -18,10 +18,12 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -55,21 +57,13 @@ public class ParquetRecordWriterWrapper } taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); + LOG.info("initialize serde with table properties."); + initializeSerProperties(taskContext, tableProperties); + LOG.info("creating real writer to write at " + name); - String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION); - if (compressionName != null && !compressionName.isEmpty()) { - //get override compression properties via "tblproperties" clause if it is set - LOG.debug("get override compression properties via tblproperties"); - - ContextUtil.getConfiguration(taskContext); - CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName); - realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf, - new Path(name), codecName); - } else { - realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, - new Path(name)); - } + realWriter = + ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name)); LOG.info("real writer: " + realWriter); } catch (final InterruptedException e) { @@ -77,6 +71,31 @@ public class ParquetRecordWriterWrapper } } + private void initializeSerProperties(JobContext job, Properties tableProperties) { + String blockSize = tableProperties.getProperty(ParquetOutputFormat.BLOCK_SIZE); + Configuration conf = ContextUtil.getConfiguration(job); + if (blockSize != null && !blockSize.isEmpty()) { + LOG.debug("get override parquet.block.size property via tblproperties"); + conf.setInt(ParquetOutputFormat.BLOCK_SIZE, Integer.valueOf(blockSize)); + } + + String enableDictionaryPage = + tableProperties.getProperty(ParquetOutputFormat.ENABLE_DICTIONARY); + if (enableDictionaryPage != null && !enableDictionaryPage.isEmpty()) { + LOG.debug("get override parquet.enable.dictionary property via tblproperties"); + conf.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, + Boolean.valueOf(enableDictionaryPage)); + } + + String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION); + if (compressionName != null && !compressionName.isEmpty()) { + //get override compression properties via "tblproperties" clause if it is set + LOG.debug("get override compression properties via tblproperties"); + CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName); + conf.set(ParquetOutputFormat.COMPRESSION, codecName.name()); + } + } + @Override public void close(final Reporter reporter) throws IOException { try { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.io.sarg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.lib; import org.apache.hadoop.hive.ql.exec.Operator; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Nov 18 00:48:40 2014 @@ -92,6 +92,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -748,8 +749,9 @@ public class Hive { throw new HiveException("Table name " + indexTblName + " already exists. Choose another name."); } - org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor = baseTbl.getSd().deepCopy(); - SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo(); + SerDeInfo serdeInfo = new SerDeInfo(); + serdeInfo.setName(indexTblName); + if(serde != null) { serdeInfo.setSerializationLib(serde); } else { @@ -762,6 +764,7 @@ public class Hive { } } + serdeInfo.setParameters(new HashMap<String, String>()); if (fieldDelim != null) { serdeInfo.getParameters().put(FIELD_DELIM, fieldDelim); serdeInfo.getParameters().put(SERIALIZATION_FORMAT, fieldDelim); @@ -788,18 +791,8 @@ public class Hive { } } - storageDescriptor.setLocation(null); - if (location != null) { - storageDescriptor.setLocation(location); - } - storageDescriptor.setInputFormat(inputFormat); - storageDescriptor.setOutputFormat(outputFormat); - - Map<String, String> params = new HashMap<String,String>(); - List<FieldSchema> indexTblCols = new ArrayList<FieldSchema>(); List<Order> sortCols = new ArrayList<Order>(); - storageDescriptor.setBucketCols(null); int k = 0; Table metaBaseTbl = new Table(baseTbl); for (int i = 0; i < metaBaseTbl.getCols().size(); i++) { @@ -815,9 +808,6 @@ public class Hive { "Check the index columns, they should appear in the table being indexed."); } - storageDescriptor.setCols(indexTblCols); - storageDescriptor.setSortCols(sortCols); - int time = (int) (System.currentTimeMillis() / 1000); org.apache.hadoop.hive.metastore.api.Table tt = null; HiveIndexHandler indexHandler = HiveUtils.getIndexHandler(this.getConf(), indexHandlerClass); @@ -851,8 +841,21 @@ public class Hive { String tdname = Utilities.getDatabaseName(tableName); String ttname = Utilities.getTableName(tableName); + + StorageDescriptor indexSd = new StorageDescriptor( + indexTblCols, + location, + inputFormat, + outputFormat, + false/*compressed - not used*/, + -1/*numBuckets - default is -1 when the table has no buckets*/, + serdeInfo, + null/*bucketCols*/, + sortCols, + null/*parameters*/); + Index indexDesc = new Index(indexName, indexHandlerClass, tdname, ttname, time, time, indexTblName, - storageDescriptor, params, deferredRebuild); + indexSd, new HashMap<String,String>(), deferredRebuild); if (indexComment != null) { indexDesc.getParameters().put("comment", indexComment); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.metadata; import java.io.IOException; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Tue Nov 18 00:48:40 2014 @@ -240,6 +240,9 @@ public class ConvertJoinMapJoin implemen new MapJoinDesc(null, null, joinDesc.getExprs(), null, null, joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); + mapJoinDesc.setNullSafes(joinDesc.getNullSafes()); + mapJoinDesc.setFilterMap(joinDesc.getFilterMap()); + mapJoinDesc.resetOrder(); } @SuppressWarnings("unchecked") Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.optimizer; import java.util.Stack; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.optimizer.metainfo.annotation; import java.util.ArrayList; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/TraitsUtil.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.optimizer.optiq; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/cost/HiveVolcanoPlanner.java Tue Nov 18 00:48:40 2014 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.optimizer.optiq.cost; import org.eigenbase.rel.RelCollationTraitDef; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HivePushFilterPastJoinRule.java Tue Nov 18 00:48:40 2014 @@ -38,114 +38,121 @@ import org.eigenbase.sql.SqlKind; public abstract class HivePushFilterPastJoinRule extends PushFilterPastJoinRule { - public static final HivePushFilterPastJoinRule FILTER_ON_JOIN = new HivePushFilterIntoJoinRule(); + public static final HivePushFilterPastJoinRule FILTER_ON_JOIN = new HivePushFilterIntoJoinRule(); - public static final HivePushFilterPastJoinRule JOIN = new HivePushDownJoinConditionRule(); + public static final HivePushFilterPastJoinRule JOIN = new HivePushDownJoinConditionRule(); - /** - * Creates a PushFilterPastJoinRule with an explicit root operand. - */ - protected HivePushFilterPastJoinRule(RelOptRuleOperand operand, String id, - boolean smart, RelFactories.FilterFactory filterFactory, - RelFactories.ProjectFactory projectFactory) { - super(operand, id, smart, filterFactory, projectFactory); - } - - /** - * Rule that tries to push filter expressions into a join condition and into - * the inputs of the join. - */ - public static class HivePushFilterIntoJoinRule extends - HivePushFilterPastJoinRule { - public HivePushFilterIntoJoinRule() { - super(RelOptRule.operand(FilterRelBase.class, - RelOptRule.operand(JoinRelBase.class, RelOptRule.any())), - "HivePushFilterPastJoinRule:filter", true, - HiveFilterRel.DEFAULT_FILTER_FACTORY, - HiveProjectRel.DEFAULT_PROJECT_FACTORY); - } - - @Override - public void onMatch(RelOptRuleCall call) { - FilterRelBase filter = call.rel(0); - JoinRelBase join = call.rel(1); - super.perform(call, filter, join); - } - } - - public static class HivePushDownJoinConditionRule extends - HivePushFilterPastJoinRule { - public HivePushDownJoinConditionRule() { - super(RelOptRule.operand(JoinRelBase.class, RelOptRule.any()), - "HivePushFilterPastJoinRule:no-filter", true, - HiveFilterRel.DEFAULT_FILTER_FACTORY, - HiveProjectRel.DEFAULT_PROJECT_FACTORY); - } - - @Override - public void onMatch(RelOptRuleCall call) { - JoinRelBase join = call.rel(0); - super.perform(call, null, join); - } - } - - /* - * Any predicates pushed down to joinFilters that aren't equality - * conditions: put them back as aboveFilters because Hive doesn't support - * not equi join conditions. - */ - @Override - protected void validateJoinFilters(List<RexNode> aboveFilters, - List<RexNode> joinFilters, JoinRelBase join, JoinRelType joinType) { - if (joinType.equals(JoinRelType.INNER)) { - ListIterator<RexNode> filterIter = joinFilters.listIterator(); - while (filterIter.hasNext()) { - RexNode exp = filterIter.next(); - if (exp instanceof RexCall) { - RexCall c = (RexCall) exp; - if (c.getOperator().getKind() == SqlKind.EQUALS) { - boolean validHiveJoinFilter = true; - for (RexNode rn : c.getOperands()) { - // NOTE: Hive dis-allows projections from both left - // & - // right side - // of join condition. Example: Hive disallows - // (r1.x=r2.x)=(r1.y=r2.y) on join condition. - if (filterRefersToBothSidesOfJoin(rn, join)) { - validHiveJoinFilter = false; - break; - } - } - if (validHiveJoinFilter) - continue; - } - } - aboveFilters.add(exp); - filterIter.remove(); - } - } - } - - private boolean filterRefersToBothSidesOfJoin(RexNode filter, JoinRelBase j) { - boolean refersToBothSides = false; - - int joinNoOfProjects = j.getRowType().getFieldCount(); - BitSet filterProjs = new BitSet(joinNoOfProjects); - BitSet allLeftProjs = new BitSet(joinNoOfProjects); - BitSet allRightProjs = new BitSet(joinNoOfProjects); - allLeftProjs.set(0, j.getInput(0).getRowType().getFieldCount(), true); - allRightProjs.set(j.getInput(0).getRowType().getFieldCount(), - joinNoOfProjects, true); - - InputFinder inputFinder = new InputFinder(filterProjs); - filter.accept(inputFinder); - - if (allLeftProjs.intersects(filterProjs) - && allRightProjs.intersects(filterProjs)) - refersToBothSides = true; + /** + * Creates a PushFilterPastJoinRule with an explicit root operand. + */ + protected HivePushFilterPastJoinRule(RelOptRuleOperand operand, String id, boolean smart, + RelFactories.FilterFactory filterFactory, RelFactories.ProjectFactory projectFactory) { + super(operand, id, smart, filterFactory, projectFactory); + } + + /** + * Rule that tries to push filter expressions into a join condition and into + * the inputs of the join. + */ + public static class HivePushFilterIntoJoinRule extends HivePushFilterPastJoinRule { + public HivePushFilterIntoJoinRule() { + super(RelOptRule.operand(FilterRelBase.class, + RelOptRule.operand(JoinRelBase.class, RelOptRule.any())), + "HivePushFilterPastJoinRule:filter", true, HiveFilterRel.DEFAULT_FILTER_FACTORY, + HiveProjectRel.DEFAULT_PROJECT_FACTORY); + } + + @Override + public void onMatch(RelOptRuleCall call) { + FilterRelBase filter = call.rel(0); + JoinRelBase join = call.rel(1); + super.perform(call, filter, join); + } + } + + public static class HivePushDownJoinConditionRule extends HivePushFilterPastJoinRule { + public HivePushDownJoinConditionRule() { + super(RelOptRule.operand(JoinRelBase.class, RelOptRule.any()), + "HivePushFilterPastJoinRule:no-filter", true, HiveFilterRel.DEFAULT_FILTER_FACTORY, + HiveProjectRel.DEFAULT_PROJECT_FACTORY); + } + + @Override + public void onMatch(RelOptRuleCall call) { + JoinRelBase join = call.rel(0); + super.perform(call, null, join); + } + } + + /* + * Any predicates pushed down to joinFilters that aren't equality conditions: + * put them back as aboveFilters because Hive doesn't support not equi join + * conditions. + */ + @Override + protected void validateJoinFilters(List<RexNode> aboveFilters, List<RexNode> joinFilters, + JoinRelBase join, JoinRelType joinType) { + if (joinType.equals(JoinRelType.INNER)) { + ListIterator<RexNode> filterIter = joinFilters.listIterator(); + while (filterIter.hasNext()) { + RexNode exp = filterIter.next(); + + if (exp instanceof RexCall) { + RexCall c = (RexCall) exp; + boolean validHiveJoinFilter = false; + + if ((c.getOperator().getKind() == SqlKind.EQUALS)) { + validHiveJoinFilter = true; + for (RexNode rn : c.getOperands()) { + // NOTE: Hive dis-allows projections from both left & right side + // of join condition. Example: Hive disallows + // (r1.x +r2.x)=(r1.y+r2.y) on join condition. + if (filterRefersToBothSidesOfJoin(rn, join)) { + validHiveJoinFilter = false; + break; + } + } + } else if ((c.getOperator().getKind() == SqlKind.LESS_THAN) + || (c.getOperator().getKind() == SqlKind.GREATER_THAN) + || (c.getOperator().getKind() == SqlKind.LESS_THAN_OR_EQUAL) + || (c.getOperator().getKind() == SqlKind.GREATER_THAN_OR_EQUAL)) { + validHiveJoinFilter = true; + // NOTE: Hive dis-allows projections from both left & right side of + // join in in equality condition. Example: Hive disallows (r1.x < + // r2.x) on join condition. + if (filterRefersToBothSidesOfJoin(c, join)) { + validHiveJoinFilter = false; + } + } + + if (validHiveJoinFilter) + continue; + } + + aboveFilters.add(exp); + filterIter.remove(); + } + } + } + + private boolean filterRefersToBothSidesOfJoin(RexNode filter, JoinRelBase j) { + boolean refersToBothSides = false; + + int joinNoOfProjects = j.getRowType().getFieldCount(); + BitSet filterProjs = new BitSet(joinNoOfProjects); + BitSet allLeftProjs = new BitSet(joinNoOfProjects); + BitSet allRightProjs = new BitSet(joinNoOfProjects); + allLeftProjs.set(0, j.getInput(0).getRowType().getFieldCount(), true); + allRightProjs.set(j.getInput(0).getRowType().getFieldCount(), joinNoOfProjects, true); - return refersToBothSides; - } + InputFinder inputFinder = new InputFinder(filterProjs); + filter.accept(inputFinder); + + if (allLeftProjs.intersects(filterProjs) && allRightProjs.intersects(filterProjs)) + refersToBothSides = true; + + return refersToBothSides; + } } // End PushFilterPastJoinRule.java Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java Tue Nov 18 00:48:40 2014 @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.optimizer.optiq.stats; import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.hadoop.hive.ql.optimizer.optiq.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveTableScanRel; @@ -32,6 +35,10 @@ import org.eigenbase.rex.RexInputRef; import org.eigenbase.rex.RexNode; import org.eigenbase.rex.RexVisitorImpl; import org.eigenbase.sql.SqlKind; +import org.eigenbase.sql.SqlOperator; +import org.eigenbase.sql.type.SqlTypeUtil; + +import com.google.common.collect.Sets; public class FilterSelectivityEstimator extends RexVisitorImpl<Double> { private final RelNode childRel; @@ -61,7 +68,7 @@ public class FilterSelectivityEstimator } Double selectivity = null; - SqlKind op = call.getKind(); + SqlKind op = getOp(call); switch (op) { case AND: { @@ -74,6 +81,7 @@ public class FilterSelectivityEstimator break; } + case NOT: case NOT_EQUALS: { selectivity = computeNotEqualitySelectivity(call); break; @@ -88,7 +96,16 @@ public class FilterSelectivityEstimator } case IN: { - selectivity = ((double) 1 / ((double) call.operands.size())); + // TODO: 1) check for duplicates 2) We assume in clause values to be + // present in NDV which may not be correct (Range check can find it) 3) We + // assume values in NDV set is uniformly distributed over col values + // (account for skewness - histogram). + selectivity = computeFunctionSelectivity(call) * (call.operands.size() - 1); + if (selectivity <= 0.0) { + selectivity = 0.10; + } else if (selectivity >= 1.0) { + selectivity = 1.0; + } break; } @@ -152,18 +169,19 @@ public class FilterSelectivityEstimator } tmpCardinality = childCardinality * tmpSelectivity; - if (tmpCardinality > 1) + if (tmpCardinality > 1 && tmpCardinality < childCardinality) { tmpSelectivity = (1 - tmpCardinality / childCardinality); - else + } else { tmpSelectivity = 1.0; + } selectivity *= tmpSelectivity; } - if (selectivity > 1) - return (1 - selectivity); - else - return 1.0; + if (selectivity < 0.0) + selectivity = 0.0; + + return (1 - selectivity); } /** @@ -225,4 +243,19 @@ public class FilterSelectivityEstimator } return false; } + + private SqlKind getOp(RexCall call) { + SqlKind op = call.getKind(); + + if (call.getKind().equals(SqlKind.OTHER_FUNCTION) + && SqlTypeUtil.inBooleanFamily(call.getType())) { + SqlOperator sqlOp = call.getOperator(); + String opName = (sqlOp != null) ? sqlOp.getName() : ""; + if (opName.equalsIgnoreCase("in")) { + op = SqlKind.IN; + } + } + + return op; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdUniqueKeys.java Tue Nov 18 00:48:40 2014 @@ -38,6 +38,7 @@ import org.eigenbase.rel.metadata.Metada import org.eigenbase.rel.metadata.ReflectiveRelMetadataProvider; import org.eigenbase.rel.metadata.RelMdUniqueKeys; import org.eigenbase.rel.metadata.RelMetadataProvider; +import org.eigenbase.relopt.RelOptUtil; import org.eigenbase.relopt.hep.HepRelVertex; import org.eigenbase.rex.RexInputRef; import org.eigenbase.rex.RexNode; @@ -100,7 +101,7 @@ public class HiveRelMdUniqueKeys { cStat.getRange().minValue != null) { double r = cStat.getRange().maxValue.doubleValue() - cStat.getRange().minValue.doubleValue() + 1; - isKey = (numRows == r); + isKey = (Math.abs(numRows - r) < RelOptUtil.EPSILON); } if ( isKey ) { BitSet key = new BitSet(); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java Tue Nov 18 00:48:40 2014 @@ -278,6 +278,7 @@ public class SqlFunctionConverter { registerFunction(">=", SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, hToken(HiveParser.GREATERTHANOREQUALTO, ">=")); registerFunction("!", SqlStdOperatorTable.NOT, hToken(HiveParser.KW_NOT, "not")); + registerFunction("<>", SqlStdOperatorTable.NOT_EQUALS, hToken(HiveParser.NOTEQUAL, "<>")); } private void registerFunction(String name, SqlOperator optiqFn, HiveToken hiveToken) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java Tue Nov 18 00:48:40 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; public class PartExprEvalUtils { @@ -103,11 +104,13 @@ public class PartExprEvalUtils { } static synchronized public ObjectPair<PrimitiveObjectInspector, ExprNodeEvaluator> prepareExpr( - ExprNodeGenericFuncDesc expr, List<String> partNames) throws HiveException { + ExprNodeGenericFuncDesc expr, List<String> partNames, + List<PrimitiveTypeInfo> partColumnTypeInfos) throws HiveException { // Create the row object List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(); for (int i = 0; i < partNames.size(); i++) { - partObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + partObjectInspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector( + partColumnTypeInfos.get(i))); } StructObjectInspector objectInspector = ObjectInspectorFactory .getStandardStructObjectInspector(partNames, partObjectInspectors); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java?rev=1640263&r1=1640262&r2=1640263&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java Tue Nov 18 00:48:40 2014 @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; /** * The basic implementation of PartitionExpressionProxy that uses ql package classes. @@ -40,13 +41,14 @@ public class PartitionExpressionForMetas } @Override - public boolean filterPartitionsByExpr(List<String> columnNames, byte[] exprBytes, + public boolean filterPartitionsByExpr(List<String> partColumnNames, + List<PrimitiveTypeInfo> partColumnTypeInfos, byte[] exprBytes, String defaultPartitionName, List<String> partitionNames) throws MetaException { ExprNodeGenericFuncDesc expr = deserializeExpr(exprBytes); try { long startTime = System.nanoTime(), len = partitionNames.size(); boolean result = PartitionPruner.prunePartitionNames( - columnNames, expr, defaultPartitionName, partitionNames); + partColumnNames, partColumnTypeInfos, expr, defaultPartitionName, partitionNames); double timeMs = (System.nanoTime() - startTime) / 1000000.0; LOG.debug("Pruning " + len + " partition names took " + timeMs + "ms"); return result;
