Modified: hive/branches/branch-0.14/itests/src/test/resources/testconfiguration.properties URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/itests/src/test/resources/testconfiguration.properties?rev=1629273&r1=1629272&r2=1629273&view=diff ============================================================================== --- hive/branches/branch-0.14/itests/src/test/resources/testconfiguration.properties (original) +++ hive/branches/branch-0.14/itests/src/test/resources/testconfiguration.properties Fri Oct 3 18:00:41 2014 @@ -159,6 +159,7 @@ minitez.query.files.shared=alter_merge_2 vector_cast_constant.q,\ vector_char_4.q,\ vector_char_simple.q,\ + vector_count_distinct.q,\ vector_data_types.q,\ vector_decimal_aggregate.q,\ vector_distinct_2.q,\
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1629273&r1=1629272&r2=1629273&view=diff ============================================================================== --- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original) +++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Fri Oct 3 18:00:41 2014 @@ -18,98 +18,30 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.IOException; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.PTFTopNHash; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TopNHash; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; -// import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; public class VectorReduceSinkOperator extends ReduceSinkOperator { - private static final Log LOG = LogFactory.getLog( - VectorReduceSinkOperator.class.getName()); - private static final long serialVersionUID = 1L; - /** - * The evaluators for the key columns. Key columns decide the sort order on - * the reducer side. Key columns are passed to the reducer in the "key". - */ - private VectorExpression[] keyEval; - - /** - * The key value writers. These know how to write the necessary writable type - * based on key column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] keyWriters; - - /** - * The evaluators for the value columns. Value columns are passed to reducer - * in the "value". - */ - private VectorExpression[] valueEval; - - /** - * The output value writers. These know how to write the necessary writable type - * based on value column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] valueWriters; - - /** - * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in - * Hive language). Partition columns decide the reducer that the current row - * goes to. Partition columns are not passed to reducer. - */ - private VectorExpression[] partitionEval; - - /** - * Evaluators for bucketing columns. This is used to compute bucket number. - */ - private VectorExpression[] bucketEval; - private int buckColIdxInKey; - - /** - * The partition value writers. These know how to write the necessary writable type - * based on partition column metadata, from the primitive vector type. - */ - private transient VectorExpressionWriter[] partitionWriters; - private transient VectorExpressionWriter[] bucketWriters = null; - - private static final boolean isDebugEnabled = LOG.isDebugEnabled(); + // Writer for producing row from input batch. + private VectorExpressionWriter[] rowWriters; + + protected transient Object[] singleRow; public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(); ReduceSinkDesc desc = (ReduceSinkDesc) conf; this.conf = desc; - keyEval = vContext.getVectorExpressions(desc.getKeyCols()); - valueEval = vContext.getVectorExpressions(desc.getValueCols()); - partitionEval = vContext.getVectorExpressions(desc.getPartitionCols()); - bucketEval = null; - if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) { - bucketEval = vContext.getVectorExpressions(desc.getBucketCols()); - buckColIdxInKey = desc.getPartitionCols().size(); - } } public VectorReduceSinkOperator() { @@ -118,406 +50,49 @@ public class VectorReduceSinkOperator ex @Override protected void initializeOp(Configuration hconf) throws HiveException { - try { - numDistributionKeys = conf.getNumDistributionKeys(); - distinctColIndices = conf.getDistinctColumnIndices(); - numDistinctExprs = distinctColIndices.size(); - - TableDesc keyTableDesc = conf.getKeySerializeInfo(); - keySerializer = (Serializer) keyTableDesc.getDeserializerClass() - .newInstance(); - keySerializer.initialize(null, keyTableDesc.getProperties()); - keyIsText = keySerializer.getSerializedClass().equals(Text.class); - - /* - * Compute and assign the key writers and the key object inspector - */ - VectorExpressionWriterFactory.processVectorExpressions( - conf.getKeyCols(), - conf.getOutputKeyColumnNames(), - new VectorExpressionWriterFactory.SingleOIDClosure() { - @Override - public void assign(VectorExpressionWriter[] writers, - ObjectInspector objectInspector) { - keyWriters = writers; - keyObjectInspector = objectInspector; - } - }); - - String colNames = ""; - for(String colName : conf.getOutputKeyColumnNames()) { - colNames = String.format("%s %s", colNames, colName); - } - - if (isDebugEnabled) { - LOG.debug(String.format("keyObjectInspector [%s]%s => %s", - keyObjectInspector.getClass(), - keyObjectInspector, - colNames)); - } - - partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols()); - if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) { - bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols()); - } - - TableDesc valueTableDesc = conf.getValueSerializeInfo(); - valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() - .newInstance(); - valueSerializer.initialize(null, valueTableDesc.getProperties()); - - /* - * Compute and assign the value writers and the value object inspector - */ - VectorExpressionWriterFactory.processVectorExpressions( - conf.getValueCols(), - conf.getOutputValueColumnNames(), - new VectorExpressionWriterFactory.SingleOIDClosure() { - @Override - public void assign(VectorExpressionWriter[] writers, - ObjectInspector objectInspector) { - valueWriters = writers; - valueObjectInspector = objectInspector; + // We need a input object inspector that is for the row we will extract out of the + // vectorized row batch, not for example, an original inspector for an ORC table, etc. + VectorExpressionWriterFactory.processVectorInspector( + (StructObjectInspector) inputObjInspectors[0], + new VectorExpressionWriterFactory.SingleOIDClosure() { + @Override + public void assign(VectorExpressionWriter[] writers, + ObjectInspector objectInspector) { + rowWriters = writers; + inputObjInspectors[0] = objectInspector; } - }); - - if (isDebugEnabled) { - colNames = ""; - for(String colName : conf.getOutputValueColumnNames()) { - colNames = String.format("%s %s", colNames, colName); - } - } - - if (isDebugEnabled) { - LOG.debug(String.format("valueObjectInspector [%s]%s => %s", - valueObjectInspector.getClass(), - valueObjectInspector, - colNames)); - } + }); + singleRow = new Object[rowWriters.length]; - int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; - int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : - numDistributionKeys; - cachedKeys = new Object[numKeys][keyLen]; - cachedValues = new Object[valueEval.length]; - - int tag = conf.getTag(); - tagByte[0] = (byte) tag; - LOG.info("Using tag = " + tag); - - int limit = conf.getTopN(); - float memUsage = conf.getTopNMemoryUsage(); - if (limit >= 0 && memUsage > 0) { - reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash; - reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); - } - - autoParallel = conf.isAutoParallel(); - - } catch(Exception e) { - throw new HiveException(e); - } + // Call ReduceSinkOperator with new input inspector. + super.initializeOp(hconf); } @Override - public void processOp(Object row, int tag) throws HiveException { - VectorizedRowBatch vrg = (VectorizedRowBatch) row; - - if (isDebugEnabled) { - LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts", - vrg.size, - valueEval.length, - keyEval.length, - partitionEval.length)); - } - - try { - // Evaluate the keys - for (int i = 0; i < keyEval.length; i++) { - keyEval[i].evaluate(vrg); - } - - // Determine which rows we need to emit based on topN optimization - int startResult = reducerHash.startVectorizedBatch(vrg.size); - if (startResult == TopNHash.EXCLUDE) { - return; // TopN wants us to exclude all rows. - } - // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected. - for (int i = 0; i < partitionEval.length; i++) { - partitionEval[i].evaluate(vrg); - } - if (bucketEval != null) { - for (int i = 0; i < bucketEval.length; i++) { - bucketEval[i].evaluate(vrg); - } - } - // run the vector evaluations - for (int i = 0; i < valueEval.length; i++) { - valueEval[i].evaluate(vrg); - } - - boolean useTopN = startResult != TopNHash.FORWARD; - // Go thru the batch once. If we are not using TopN, we will forward all things and be done. - // If we are using topN, we will make the first key for each row and store/forward it. - // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case. - for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { - int rowIndex = batchIndex; - if (vrg.selectedInUse) { - rowIndex = vrg.selected[batchIndex]; - } - // First, make distrib key components for this row and determine distKeyLength. - populatedCachedDistributionKeys(vrg, rowIndex, 0); - - // replace bucketing columns with hashcode % numBuckets - if (bucketEval != null) { - bucketNumber = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets()); - cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber)); - } - HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); - int distKeyLength = firstKey.getDistKeyLength(); - // Add first distinct expression, if any. - if (numDistinctExprs > 0) { - populateCachedDistinctKeys(vrg, rowIndex, 0); - firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); - } - - final int hashCode; - - // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] - if (autoParallel && partitionEval.length > 0) { - hashCode = computeMurmurHash(firstKey); - } else { - hashCode = computeHashCode(vrg, rowIndex); - } - - firstKey.setHashCode(hashCode); - - if (useTopN) { - /* - * in case of TopN for windowing, we need to distinguish between - * rows with null partition keys and rows with value 0 for partition keys. - */ - boolean partkeysNull = conf.isPTFReduceSink() && partitionKeysAreNull(vrg, rowIndex); - reducerHash.tryStoreVectorizedKey(firstKey, partkeysNull, batchIndex); - } else { - // No TopN, just forward the first key and all others. - BytesWritable value = makeValueWritable(vrg, rowIndex); - collect(firstKey, value); - forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0); - } - } - - if (!useTopN) return; // All done. - - // If we use topN, we have called tryStore on every key now. We can process the results. - for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { - int result = reducerHash.getVectorizedBatchResult(batchIndex); - if (result == TopNHash.EXCLUDE) continue; - int rowIndex = batchIndex; - if (vrg.selectedInUse) { - rowIndex = vrg.selected[batchIndex]; - } - // Compute value and hashcode - we'd either store or forward them. - BytesWritable value = makeValueWritable(vrg, rowIndex); - int distKeyLength = -1; - int hashCode; - if (result == TopNHash.FORWARD) { - HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex); - distKeyLength = firstKey.getDistKeyLength(); - hashCode = firstKey.hashCode(); - collect(firstKey, value); - } else { - hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex); - reducerHash.storeValue(result, hashCode, value, true); - distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex); - } - // Now forward other the rows if there's multi-distinct (but see TODO in forward...). - // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1. - if (numDistinctExprs > 1) { - populatedCachedDistributionKeys(vrg, rowIndex, 1); - forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1); - } - } - } catch (SerDeException e) { - throw new HiveException(e); - } catch (IOException e) { - throw new HiveException(e); - } - } - - /** - * This function creates and forwards all the additional KVs for the multi-distinct case, - * after the first (0th) KV pertaining to the row has already been stored or forwarded. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param hashCode the partitioning hash code to use; same as for the first KV - * @param value the value to use; same as for the first KV - * @param distKeyLength the distribution key length of the first key; TODO probably extraneous - * @param tag the tag - * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored - */ - private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode, - BytesWritable value, int distKeyLength, int tag, int baseIndex) - throws HiveException, SerDeException, IOException { - // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if - // the first key has already been stored. There's few bytes difference between keys - // for different distincts, and the value/etc. are all the same. - // We could store deltas to re-gen extra rows when flushing TopN. - for (int i = 1; i < numDistinctExprs; i++) { - if (i != baseIndex) { - System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys); - } - populateCachedDistinctKeys(vrg, rowIndex, i); - HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength); - hiveKey.setHashCode(hashCode); - collect(hiveKey, value); - } - } - - /** - * Populate distribution keys part of cachedKeys for a particular row from the batch. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param index the cachedKeys index to write to - */ - private void populatedCachedDistributionKeys( - VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException { - for (int i = 0; i < numDistributionKeys; i++) { - int batchColumn = keyEval[i].getOutputColumn(); - ColumnVector vectorColumn = vrg.cols[batchColumn]; - cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex); - } - if (cachedKeys[index].length > numDistributionKeys) { - cachedKeys[index][numDistributionKeys] = null; - } - } - - /** - * Populate distinct keys part of cachedKeys for a particular row from the batch. - * @param vrg the batch - * @param rowIndex the row index in the batch - * @param index the cachedKeys index to write to - */ - private void populateCachedDistinctKeys( - VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException { - StandardUnion union; - cachedKeys[index][numDistributionKeys] = union = new StandardUnion( - (byte)index, new Object[distinctColIndices.get(index).size()]); - Object[] distinctParameters = (Object[]) union.getObject(); - for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) { - int distinctColIndex = distinctColIndices.get(index).get(distinctParamI); - int batchColumn = keyEval[distinctColIndex].getOutputColumn(); - distinctParameters[distinctParamI] = - keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex); - } - union.setTag((byte) index); - } + public void processOp(Object data, int tag) throws HiveException { + VectorizedRowBatch vrg = (VectorizedRowBatch) data; - private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex) - throws HiveException, SerDeException { - int length = valueEval.length; - - // in case of bucketed table, insert the bucket number as the last column in value - if (bucketEval != null) { - length -= 1; - cachedValues[length] = new Text(String.valueOf(bucketNumber)); + for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) { + Object row = getRowObject(vrg, batchIndex); + super.processOp(row, tag); } - - for (int i = 0; i < length; i++) { - int batchColumn = valueEval[i].getOutputColumn(); - ColumnVector vectorColumn = vrg.cols[batchColumn]; - cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex); - } - // Serialize the value - return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector); } - private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveException { - // Evaluate the HashCode - int keyHashCode = 0; - if (partitionEval.length == 0) { - // If no partition cols, just distribute the data uniformly to provide better - // load balance. If the requirement is to have a single reducer, we should set - // the number of reducers to 1. - // Use a constant seed to make the code deterministic. - if (random == null) { - random = new Random(12345); - } - keyHashCode = random.nextInt(); - } else { - for (int p = 0; p < partitionEval.length; p++) { - ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()]; - Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex); - keyHashCode = keyHashCode - * 31 - + ObjectInspectorUtils.hashCode( - partitionValue, - partitionWriters[p].getObjectInspector()); - } - } - return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber; - } - - private boolean partitionKeysAreNull(VectorizedRowBatch vrg, int rowIndex) + private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex) throws HiveException { - if (partitionEval.length != 0) { - for (int p = 0; p < partitionEval.length; p++) { - ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()]; - Object partitionValue = partitionWriters[p].writeValue(columnVector, - rowIndex); - if (partitionValue != null) { - return false; - } + int batchIndex = rowIndex; + if (vrg.selectedInUse) { + batchIndex = vrg.selected[rowIndex]; + } + for (int i = 0; i < vrg.projectionSize; i++) { + ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]]; + if (vectorColumn != null) { + singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex); + } else { + // Some columns from tables are not used. + singleRow[i] = null; } - return true; - } - return false; - } - - private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException { - int bucketNum = 0; - for (int p = 0; p < bucketEval.length; p++) { - ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()]; - Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex); - bucketNum = bucketNum - * 31 - + ObjectInspectorUtils.hashCode( - bucketValue, - bucketWriters[p].getObjectInspector()); - } - - if (bucketNum < 0) { - bucketNum = -1 * bucketNum; } - - return bucketNum % numBuckets; - } - - static public String getOperatorName() { - return "RS"; - } - - public VectorExpression[] getPartitionEval() { - return partitionEval; - } - - public void setPartitionEval(VectorExpression[] partitionEval) { - this.partitionEval = partitionEval; - } - - public VectorExpression[] getValueEval() { - return valueEval; - } - - public void setValueEval(VectorExpression[] valueEval) { - this.valueEval = valueEval; - } - - public VectorExpression[] getKeyEval() { - return keyEval; - } - - public void setKeyEval(VectorExpression[] keyEval) { - this.keyEval = keyEval; + return singleRow; } } Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1629273&r1=1629272&r2=1629273&view=diff ============================================================================== --- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original) +++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Fri Oct 3 18:00:41 2014 @@ -1060,6 +1060,31 @@ public final class VectorExpressionWrite closure.assign(writers, oids); } + /** + * Creates the value writers for an struct object inspector. + * Creates an appropriate output object inspector. + */ + public static void processVectorInspector( + StructObjectInspector structObjInspector, + SingleOIDClosure closure) + throws HiveException { + List<? extends StructField> fields = structObjInspector.getAllStructFieldRefs(); + VectorExpressionWriter[] writers = new VectorExpressionWriter[fields.size()]; + List<ObjectInspector> oids = new ArrayList<ObjectInspector>(writers.length); + ArrayList<String> columnNames = new ArrayList<String>(); + int i = 0; + for(StructField field : fields) { + ObjectInspector fieldObjInsp = field.getFieldObjectInspector(); + writers[i] = VectorExpressionWriterFactory. + genVectorExpressionWritable(fieldObjInsp); + columnNames.add(field.getFieldName()); + oids.add(writers[i].getObjectInspector()); + i++; + } + ObjectInspector objectInspector = ObjectInspectorFactory. + getStandardStructObjectInspector(columnNames,oids); + closure.assign(writers, objectInspector); + } /** * Returns {@link VectorExpressionWriter} objects for the fields in the given Added: hive/branches/branch-0.14/ql/src/test/queries/clientpositive/vector_count_distinct.q URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/queries/clientpositive/vector_count_distinct.q?rev=1629273&view=auto ============================================================================== --- hive/branches/branch-0.14/ql/src/test/queries/clientpositive/vector_count_distinct.q (added) +++ hive/branches/branch-0.14/ql/src/test/queries/clientpositive/vector_count_distinct.q Fri Oct 3 18:00:41 2014 @@ -0,0 +1,108 @@ +SET hive.vectorized.execution.enabled=true; + +create table web_sales_txt +( + ws_sold_date_sk int, + ws_sold_time_sk int, + ws_ship_date_sk int, + ws_item_sk int, + ws_bill_customer_sk int, + ws_bill_cdemo_sk int, + ws_bill_hdemo_sk int, + ws_bill_addr_sk int, + ws_ship_customer_sk int, + ws_ship_cdemo_sk int, + ws_ship_hdemo_sk int, + ws_ship_addr_sk int, + ws_web_page_sk int, + ws_web_site_sk int, + ws_ship_mode_sk int, + ws_warehouse_sk int, + ws_promo_sk int, + ws_order_number int, + ws_quantity int, + ws_wholesale_cost decimal(7,2), + ws_list_price decimal(7,2), + ws_sales_price decimal(7,2), + ws_ext_discount_amt decimal(7,2), + ws_ext_sales_price decimal(7,2), + ws_ext_wholesale_cost decimal(7,2), + ws_ext_list_price decimal(7,2), + ws_ext_tax decimal(7,2), + ws_coupon_amt decimal(7,2), + ws_ext_ship_cost decimal(7,2), + ws_net_paid decimal(7,2), + ws_net_paid_inc_tax decimal(7,2), + ws_net_paid_inc_ship decimal(7,2), + ws_net_paid_inc_ship_tax decimal(7,2), + ws_net_profit decimal(7,2) +) +row format delimited fields terminated by '|' +stored as textfile; + +LOAD DATA LOCAL INPATH '../../data/files/web_sales_2k' OVERWRITE INTO TABLE web_sales_txt; + +------------------------------------------------------------------------------------------ + +create table web_sales +( + ws_sold_date_sk int, + ws_sold_time_sk int, + ws_ship_date_sk int, + ws_item_sk int, + ws_bill_customer_sk int, + ws_bill_cdemo_sk int, + ws_bill_hdemo_sk int, + ws_bill_addr_sk int, + ws_ship_customer_sk int, + ws_ship_cdemo_sk int, + ws_ship_hdemo_sk int, + ws_ship_addr_sk int, + ws_web_page_sk int, + ws_ship_mode_sk int, + ws_warehouse_sk int, + ws_promo_sk int, + ws_order_number int, + ws_quantity int, + ws_wholesale_cost decimal(7,2), + ws_list_price decimal(7,2), + ws_sales_price decimal(7,2), + ws_ext_discount_amt decimal(7,2), + ws_ext_sales_price decimal(7,2), + ws_ext_wholesale_cost decimal(7,2), + ws_ext_list_price decimal(7,2), + ws_ext_tax decimal(7,2), + ws_coupon_amt decimal(7,2), + ws_ext_ship_cost decimal(7,2), + ws_net_paid decimal(7,2), + ws_net_paid_inc_tax decimal(7,2), + ws_net_paid_inc_ship decimal(7,2), + ws_net_paid_inc_ship_tax decimal(7,2), + ws_net_profit decimal(7,2) +) +partitioned by +( + ws_web_site_sk int +) +stored as orc +tblproperties ("orc.stripe.size"="33554432", "orc.compress.size"="16384"); + +set hive.exec.dynamic.partition.mode=nonstrict; + +insert overwrite table web_sales +partition (ws_web_site_sk) +select ws_sold_date_sk, ws_sold_time_sk, ws_ship_date_sk, ws_item_sk, + ws_bill_customer_sk, ws_bill_cdemo_sk, ws_bill_hdemo_sk, ws_bill_addr_sk, + ws_ship_customer_sk, ws_ship_cdemo_sk, ws_ship_hdemo_sk, ws_ship_addr_sk, + ws_web_page_sk, ws_ship_mode_sk, ws_warehouse_sk, ws_promo_sk, ws_order_number, + ws_quantity, ws_wholesale_cost, ws_list_price, ws_sales_price, ws_ext_discount_amt, + ws_ext_sales_price, ws_ext_wholesale_cost, ws_ext_list_price, ws_ext_tax, + ws_coupon_amt, ws_ext_ship_cost, ws_net_paid, ws_net_paid_inc_tax, ws_net_paid_inc_ship, + ws_net_paid_inc_ship_tax, ws_net_profit, ws_web_site_sk from web_sales_txt; + +------------------------------------------------------------------------------------------ + +explain +select count(distinct ws_order_number) from web_sales; + +select count(distinct ws_order_number) from web_sales; \ No newline at end of file
