http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala new file mode 100644 index 0000000..ce701fb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan} +import org.apache.spark.sql.types.UserDefinedType +import org.apache.spark.storage.StorageLevel +import org.apache.spark.{Accumulable, Accumulator, Accumulators} + +private[sql] object InMemoryRelation { + def apply( + useCompression: Boolean, + batchSize: Int, + storageLevel: StorageLevel, + child: SparkPlan, + tableName: Option[String]): InMemoryRelation = + new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, + if (child.outputsUnsafeRows) child else ConvertToUnsafe(child), + tableName)() +} + +/** + * CachedBatch is a cached batch of rows. + * + * @param numRows The total number of rows in this batch + * @param buffers The buffers for serialized columns + * @param stats The stat of columns + */ +private[columnar] +case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) + +private[sql] case class InMemoryRelation( + output: Seq[Attribute], + useCompression: Boolean, + batchSize: Int, + storageLevel: StorageLevel, + @transient child: SparkPlan, + tableName: Option[String])( + @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null, + @transient private var _statistics: Statistics = null, + private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) + extends LogicalPlan with MultiInstanceRelation { + + private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = + if (_batchStats == null) { + child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) + } else { + _batchStats + } + + @transient val partitionStatistics = new PartitionStatistics(output) + + private def computeSizeInBytes = { + val sizeOfRow: Expression = + BindReferences.bindReference( + output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), + partitionStatistics.schema) + + batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum + } + + // Statistics propagation contracts: + // 1. Non-null `_statistics` must reflect the actual statistics of the underlying data + // 2. Only propagate statistics when `_statistics` is non-null + private def statisticsToBePropagated = if (_statistics == null) { + val updatedStats = statistics + if (_statistics == null) null else updatedStats + } else { + _statistics + } + + override def statistics: Statistics = { + if (_statistics == null) { + if (batchStats.value.isEmpty) { + // Underlying columnar RDD hasn't been materialized, no useful statistics information + // available, return the default statistics. + Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + } else { + // Underlying columnar RDD has been materialized, required information has also been + // collected via the `batchStats` accumulator, compute the final statistics, + // and update `_statistics`. + _statistics = Statistics(sizeInBytes = computeSizeInBytes) + _statistics + } + } else { + // Pre-computed statistics + _statistics + } + } + + // If the cached column buffers were not passed in, we calculate them in the constructor. + // As in Spark, the actual work of caching is lazy. + if (_cachedColumnBuffers == null) { + buildBuffers() + } + + def recache(): Unit = { + _cachedColumnBuffers.unpersist() + _cachedColumnBuffers = null + buildBuffers() + } + + private def buildBuffers(): Unit = { + val output = child.output + val cached = child.execute().mapPartitionsInternal { rowIterator => + new Iterator[CachedBatch] { + def next(): CachedBatch = { + val columnBuilders = output.map { attribute => + ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) + }.toArray + + var rowCount = 0 + var totalSize = 0L + while (rowIterator.hasNext && rowCount < batchSize + && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { + val row = rowIterator.next() + + // Added for SPARK-6082. This assertion can be useful for scenarios when something + // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM + // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat + // hard to decipher. + assert( + row.numFields == columnBuilders.size, + s"Row column number mismatch, expected ${output.size} columns, " + + s"but got ${row.numFields}." + + s"\nRow content: $row") + + var i = 0 + totalSize = 0 + while (i < row.numFields) { + columnBuilders(i).appendFrom(row, i) + totalSize += columnBuilders(i).columnStats.sizeInBytes + i += 1 + } + rowCount += 1 + } + + val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) + .flatMap(_.values)) + + batchStats += stats + CachedBatch(rowCount, columnBuilders.map(_.build().array()), stats) + } + + def hasNext: Boolean = rowIterator.hasNext + } + }.persist(storageLevel) + + cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString)) + _cachedColumnBuffers = cached + } + + def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { + InMemoryRelation( + newOutput, useCompression, batchSize, storageLevel, child, tableName)( + _cachedColumnBuffers, statisticsToBePropagated, batchStats) + } + + override def children: Seq[LogicalPlan] = Seq.empty + + override def newInstance(): this.type = { + new InMemoryRelation( + output.map(_.newInstance()), + useCompression, + batchSize, + storageLevel, + child, + tableName)( + _cachedColumnBuffers, + statisticsToBePropagated, + batchStats).asInstanceOf[this.type] + } + + def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers + + override protected def otherCopyArgs: Seq[AnyRef] = + Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats) + + private[sql] def uncache(blocking: Boolean): Unit = { + Accumulators.remove(batchStats.id) + cachedColumnBuffers.unpersist(blocking) + _cachedColumnBuffers = null + } +} + +private[sql] case class InMemoryColumnarTableScan( + attributes: Seq[Attribute], + predicates: Seq[Expression], + @transient relation: InMemoryRelation) + extends LeafNode { + + override def output: Seq[Attribute] = attributes + + // The cached version does not change the outputPartitioning of the original SparkPlan. + override def outputPartitioning: Partitioning = relation.child.outputPartitioning + + // The cached version does not change the outputOrdering of the original SparkPlan. + override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering + + override def outputsUnsafeRows: Boolean = true + + private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + + // Returned filter predicate should return false iff it is impossible for the input expression + // to evaluate to `true' based on statistics collected about this partition batch. + @transient val buildFilter: PartialFunction[Expression, Expression] = { + case And(lhs: Expression, rhs: Expression) + if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) => + (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _) + + case Or(lhs: Expression, rhs: Expression) + if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) => + buildFilter(lhs) || buildFilter(rhs) + + case EqualTo(a: AttributeReference, l: Literal) => + statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound + case EqualTo(l: Literal, a: AttributeReference) => + statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound + + case LessThan(a: AttributeReference, l: Literal) => statsFor(a).lowerBound < l + case LessThan(l: Literal, a: AttributeReference) => l < statsFor(a).upperBound + + case LessThanOrEqual(a: AttributeReference, l: Literal) => statsFor(a).lowerBound <= l + case LessThanOrEqual(l: Literal, a: AttributeReference) => l <= statsFor(a).upperBound + + case GreaterThan(a: AttributeReference, l: Literal) => l < statsFor(a).upperBound + case GreaterThan(l: Literal, a: AttributeReference) => statsFor(a).lowerBound < l + + case GreaterThanOrEqual(a: AttributeReference, l: Literal) => l <= statsFor(a).upperBound + case GreaterThanOrEqual(l: Literal, a: AttributeReference) => statsFor(a).lowerBound <= l + + case IsNull(a: Attribute) => statsFor(a).nullCount > 0 + case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 + } + + val partitionFilters: Seq[Expression] = { + predicates.flatMap { p => + val filter = buildFilter.lift(p) + val boundFilter = + filter.map( + BindReferences.bindReference( + _, + relation.partitionStatistics.schema, + allowFailures = true)) + + boundFilter.foreach(_ => + filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f"))) + + // If the filter can't be resolved then we are missing required statistics. + boundFilter.filter(_.resolved) + } + } + + lazy val enableAccumulators: Boolean = + sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean + + // Accumulators used for testing purposes + lazy val readPartitions: Accumulator[Int] = sparkContext.accumulator(0) + lazy val readBatches: Accumulator[Int] = sparkContext.accumulator(0) + + private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + + protected override def doExecute(): RDD[InternalRow] = { + if (enableAccumulators) { + readPartitions.setValue(0) + readBatches.setValue(0) + } + + // Using these variables here to avoid serialization of entire objects (if referenced directly) + // within the map Partitions closure. + val schema = relation.partitionStatistics.schema + val schemaIndex = schema.zipWithIndex + val relOutput = relation.output + val buffers = relation.cachedColumnBuffers + + buffers.mapPartitionsInternal { cachedBatchIterator => + val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + schema) + + // Find the ordinals and data types of the requested columns. + val (requestedColumnIndices, requestedColumnDataTypes) = + attributes.map { a => + relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType + }.unzip + + // Do partition batch pruning if enabled + val cachedBatchesToScan = + if (inMemoryPartitionPruningEnabled) { + cachedBatchIterator.filter { cachedBatch => + if (!partitionFilter(cachedBatch.stats)) { + def statsString: String = schemaIndex.map { + case (a, i) => + val value = cachedBatch.stats.get(i, a.dataType) + s"${a.name}: $value" + }.mkString(", ") + logInfo(s"Skipping partition based on stats $statsString") + false + } else { + if (enableAccumulators) { + readBatches += 1 + } + true + } + } + } else { + cachedBatchIterator + } + + val columnTypes = requestedColumnDataTypes.map { + case udt: UserDefinedType[_] => udt.sqlType + case other => other + }.toArray + val columnarIterator = GenerateColumnAccessor.generate(columnTypes) + columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray) + if (enableAccumulators && columnarIterator.hasNext) { + readPartitions += 1 + } + columnarIterator + } + } +}
http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala new file mode 100644 index 0000000..8d99546 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.expressions.MutableRow + +private[columnar] trait NullableColumnAccessor extends ColumnAccessor { + private var nullsBuffer: ByteBuffer = _ + private var nullCount: Int = _ + private var seenNulls: Int = 0 + + private var nextNullIndex: Int = _ + private var pos: Int = 0 + + abstract override protected def initialize(): Unit = { + nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder()) + nullCount = ByteBufferHelper.getInt(nullsBuffer) + nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 + pos = 0 + + underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4) + super.initialize() + } + + abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = { + if (pos == nextNullIndex) { + seenNulls += 1 + + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + + row.setNullAt(ordinal) + } else { + super.extractTo(row, ordinal) + } + + pos += 1 + } + + abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext +} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala new file mode 100644 index 0000000..3a1931b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.sql.catalyst.InternalRow + +/** + * A stackable trait used for building byte buffer for a column containing null values. Memory + * layout of the final byte buffer is: + * {{{ + * .------------------- Null count N (4 bytes) + * | .--------------- Null positions (4 x N bytes, empty if null count is zero) + * | | .--------- Non-null elements + * V V V + * +---+-----+---------+ + * | | ... | ... ... | + * +---+-----+---------+ + * }}} + */ +private[columnar] trait NullableColumnBuilder extends ColumnBuilder { + protected var nulls: ByteBuffer = _ + protected var nullCount: Int = _ + private var pos: Int = _ + + abstract override def initialize( + initialSize: Int, + columnName: String, + useCompression: Boolean): Unit = { + + nulls = ByteBuffer.allocate(1024) + nulls.order(ByteOrder.nativeOrder()) + pos = 0 + nullCount = 0 + super.initialize(initialSize, columnName, useCompression) + } + + abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { + columnStats.gatherStats(row, ordinal) + if (row.isNullAt(ordinal)) { + nulls = ColumnBuilder.ensureFreeSpace(nulls, 4) + nulls.putInt(pos) + nullCount += 1 + } else { + super.appendFrom(row, ordinal) + } + pos += 1 + } + + abstract override def build(): ByteBuffer = { + val nonNulls = super.build() + val nullDataLen = nulls.position() + + nulls.limit(nullDataLen) + nulls.rewind() + + val buffer = ByteBuffer + .allocate(4 + nullDataLen + nonNulls.remaining()) + .order(ByteOrder.nativeOrder()) + .putInt(nullCount) + .put(nulls) + .put(nonNulls) + + buffer.rewind() + buffer + } + + protected def buildNonNulls(): ByteBuffer = { + nulls.limit(nulls.position()).rewind() + super.build() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala new file mode 100644 index 0000000..6579b50 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor} +import org.apache.spark.sql.types.AtomicType + +private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends ColumnAccessor { + this: NativeColumnAccessor[T] => + + private var decoder: Decoder[T] = _ + + abstract override protected def initialize(): Unit = { + super.initialize() + decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType) + } + + abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext + + override def extractSingle(row: MutableRow, ordinal: Int): Unit = { + decoder.next(row, ordinal) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala new file mode 100644 index 0000000..b0e216f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.columnar.{ColumnBuilder, NativeColumnBuilder} +import org.apache.spark.sql.types.AtomicType + +/** + * A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of + * the final byte buffer is: + * {{{ + * .----------------------- Null count N (4 bytes) + * | .------------------- Null positions (4 x N bytes, empty if null count is zero) + * | | .------------- Compression scheme ID (4 bytes) + * | | | .--------- Compressed non-null elements + * V V V V + * +---+-----+---+---------+ + * | | ... | | ... ... | + * +---+-----+---+---------+ + * \-------/ \-----------/ + * header body + * }}} + */ +private[columnar] trait CompressibleColumnBuilder[T <: AtomicType] + extends ColumnBuilder with Logging { + + this: NativeColumnBuilder[T] with WithCompressionSchemes => + + var compressionEncoders: Seq[Encoder[T]] = _ + + abstract override def initialize( + initialSize: Int, + columnName: String, + useCompression: Boolean): Unit = { + + compressionEncoders = + if (useCompression) { + schemes.filter(_.supports(columnType)).map(_.encoder[T](columnType)) + } else { + Seq(PassThrough.encoder(columnType)) + } + super.initialize(initialSize, columnName, useCompression) + } + + protected def isWorthCompressing(encoder: Encoder[T]) = { + encoder.compressionRatio < 0.8 + } + + private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + var i = 0 + while (i < compressionEncoders.length) { + compressionEncoders(i).gatherCompressibilityStats(row, ordinal) + i += 1 + } + } + + abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { + super.appendFrom(row, ordinal) + if (!row.isNullAt(ordinal)) { + gatherCompressibilityStats(row, ordinal) + } + } + + override def build(): ByteBuffer = { + val nonNullBuffer = buildNonNulls() + val encoder: Encoder[T] = { + val candidate = compressionEncoders.minBy(_.compressionRatio) + if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType) + } + + // Header = null count + null positions + val headerSize = 4 + nulls.limit() + val compressedSize = if (encoder.compressedSize == 0) { + nonNullBuffer.remaining() + } else { + encoder.compressedSize + } + + val compressedBuffer = ByteBuffer + // Reserves 4 bytes for compression scheme ID + .allocate(headerSize + 4 + compressedSize) + .order(ByteOrder.nativeOrder) + // Write the header + .putInt(nullCount) + .put(nulls) + + logDebug(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}") + encoder.compress(nonNullBuffer, compressedBuffer) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala new file mode 100644 index 0000000..920381f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import java.nio.{ByteBuffer, ByteOrder} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType} +import org.apache.spark.sql.types.AtomicType + +private[columnar] trait Encoder[T <: AtomicType] { + def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {} + + def compressedSize: Int + + def uncompressedSize: Int + + def compressionRatio: Double = { + if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0 + } + + def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer +} + +private[columnar] trait Decoder[T <: AtomicType] { + def next(row: MutableRow, ordinal: Int): Unit + + def hasNext: Boolean +} + +private[columnar] trait CompressionScheme { + def typeId: Int + + def supports(columnType: ColumnType[_]): Boolean + + def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] + + def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] +} + +private[columnar] trait WithCompressionSchemes { + def schemes: Seq[CompressionScheme] +} + +private[columnar] trait AllCompressionSchemes extends WithCompressionSchemes { + override val schemes: Seq[CompressionScheme] = CompressionScheme.all +} + +private[columnar] object CompressionScheme { + val all: Seq[CompressionScheme] = + Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta) + + private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap + + def apply(typeId: Int): CompressionScheme = { + typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException( + s"Unrecognized compression scheme type ID: $typeId")) + } + + def columnHeaderSize(columnBuffer: ByteBuffer): Int = { + val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder) + val nullCount = header.getInt() + // null count + null positions + 4 + 4 * nullCount + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala new file mode 100644 index 0000000..941f03b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import java.nio.ByteBuffer + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.types._ + + +private[columnar] case object PassThrough extends CompressionScheme { + override val typeId = 0 + + override def supports(columnType: ColumnType[_]): Boolean = true + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { + new this.Encoder[T](columnType) + } + + override def decoder[T <: AtomicType]( + buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = { + new this.Decoder(buffer, columnType) + } + + class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { + override def uncompressedSize: Int = 0 + + override def compressedSize: Int = 0 + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + // Writes compression type ID and copies raw contents + to.putInt(PassThrough.typeId).put(from).rewind() + to + } + } + + class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + extends compression.Decoder[T] { + + override def next(row: MutableRow, ordinal: Int): Unit = { + columnType.extract(buffer, row, ordinal) + } + + override def hasNext: Boolean = buffer.hasRemaining + } +} + +private[columnar] case object RunLengthEncoding extends CompressionScheme { + override val typeId = 1 + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { + new this.Encoder[T](columnType) + } + + override def decoder[T <: AtomicType]( + buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = { + new this.Decoder(buffer, columnType) + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType match { + case INT | LONG | SHORT | BYTE | STRING | BOOLEAN => true + case _ => false + } + + class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { + private var _uncompressedSize = 0 + private var _compressedSize = 0 + + // Using `MutableRow` to store the last value to avoid boxing/unboxing cost. + private val lastValue = new SpecificMutableRow(Seq(columnType.dataType)) + private var lastRun = 0 + + override def uncompressedSize: Int = _uncompressedSize + + override def compressedSize: Int = _compressedSize + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + val value = columnType.getField(row, ordinal) + val actualSize = columnType.actualSize(row, ordinal) + _uncompressedSize += actualSize + + if (lastValue.isNullAt(0)) { + columnType.copyField(row, ordinal, lastValue, 0) + lastRun = 1 + _compressedSize += actualSize + 4 + } else { + if (columnType.getField(lastValue, 0) == value) { + lastRun += 1 + } else { + _compressedSize += actualSize + 4 + columnType.copyField(row, ordinal, lastValue, 0) + lastRun = 1 + } + } + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + to.putInt(RunLengthEncoding.typeId) + + if (from.hasRemaining) { + val currentValue = new SpecificMutableRow(Seq(columnType.dataType)) + var currentRun = 1 + val value = new SpecificMutableRow(Seq(columnType.dataType)) + + columnType.extract(from, currentValue, 0) + + while (from.hasRemaining) { + columnType.extract(from, value, 0) + + if (value.get(0, columnType.dataType) == currentValue.get(0, columnType.dataType)) { + currentRun += 1 + } else { + // Writes current run + columnType.append(currentValue, 0, to) + to.putInt(currentRun) + + // Resets current run + columnType.copyField(value, 0, currentValue, 0) + currentRun = 1 + } + } + + // Writes the last run + columnType.append(currentValue, 0, to) + to.putInt(currentRun) + } + + to.rewind() + to + } + } + + class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + extends compression.Decoder[T] { + + private var run = 0 + private var valueCount = 0 + private var currentValue: T#InternalType = _ + + override def next(row: MutableRow, ordinal: Int): Unit = { + if (valueCount == run) { + currentValue = columnType.extract(buffer) + run = ByteBufferHelper.getInt(buffer) + valueCount = 1 + } else { + valueCount += 1 + } + + columnType.setField(row, ordinal, currentValue) + } + + override def hasNext: Boolean = valueCount < run || buffer.hasRemaining + } +} + +private[columnar] case object DictionaryEncoding extends CompressionScheme { + override val typeId = 2 + + // 32K unique values allowed + val MAX_DICT_SIZE = Short.MaxValue + + override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : Decoder[T] = { + new this.Decoder(buffer, columnType) + } + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { + new this.Encoder[T](columnType) + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType match { + case INT | LONG | STRING => true + case _ => false + } + + class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { + // Size of the input, uncompressed, in bytes. Note that we only count until the dictionary + // overflows. + private var _uncompressedSize = 0 + + // If the number of distinct elements is too large, we discard the use of dictionary encoding + // and set the overflow flag to true. + private var overflow = false + + // Total number of elements. + private var count = 0 + + // The reverse mapping of _dictionary, i.e. mapping encoded integer to the value itself. + private var values = new mutable.ArrayBuffer[T#InternalType](1024) + + // The dictionary that maps a value to the encoded short integer. + private val dictionary = mutable.HashMap.empty[Any, Short] + + // Size of the serialized dictionary in bytes. Initialized to 4 since we need at least an `Int` + // to store dictionary element count. + private var dictionarySize = 4 + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + val value = columnType.getField(row, ordinal) + + if (!overflow) { + val actualSize = columnType.actualSize(row, ordinal) + count += 1 + _uncompressedSize += actualSize + + if (!dictionary.contains(value)) { + if (dictionary.size < MAX_DICT_SIZE) { + val clone = columnType.clone(value) + values += clone + dictionarySize += actualSize + dictionary(clone) = dictionary.size.toShort + } else { + overflow = true + values.clear() + dictionary.clear() + } + } + } + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + if (overflow) { + throw new IllegalStateException( + "Dictionary encoding should not be used because of dictionary overflow.") + } + + to.putInt(DictionaryEncoding.typeId) + .putInt(dictionary.size) + + var i = 0 + while (i < values.length) { + columnType.append(values(i), to) + i += 1 + } + + while (from.hasRemaining) { + to.putShort(dictionary(columnType.extract(from))) + } + + to.rewind() + to + } + + override def uncompressedSize: Int = _uncompressedSize + + override def compressedSize: Int = if (overflow) Int.MaxValue else dictionarySize + count * 2 + } + + class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + extends compression.Decoder[T] { + + private val dictionary: Array[Any] = { + val elementNum = ByteBufferHelper.getInt(buffer) + Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any]) + } + + override def next(row: MutableRow, ordinal: Int): Unit = { + columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType]) + } + + override def hasNext: Boolean = buffer.hasRemaining + } +} + +private[columnar] case object BooleanBitSet extends CompressionScheme { + override val typeId = 3 + + val BITS_PER_LONG = 64 + + override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { + new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]] + } + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { + (new this.Encoder).asInstanceOf[compression.Encoder[T]] + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType == BOOLEAN + + class Encoder extends compression.Encoder[BooleanType.type] { + private var _uncompressedSize = 0 + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + _uncompressedSize += BOOLEAN.defaultSize + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + to.putInt(BooleanBitSet.typeId) + // Total element count (1 byte per Boolean value) + .putInt(from.remaining) + + while (from.remaining >= BITS_PER_LONG) { + var word = 0: Long + var i = 0 + + while (i < BITS_PER_LONG) { + if (BOOLEAN.extract(from)) { + word |= (1: Long) << i + } + i += 1 + } + + to.putLong(word) + } + + if (from.hasRemaining) { + var word = 0: Long + var i = 0 + + while (from.hasRemaining) { + if (BOOLEAN.extract(from)) { + word |= (1: Long) << i + } + i += 1 + } + + to.putLong(word) + } + + to.rewind() + to + } + + override def uncompressedSize: Int = _uncompressedSize + + override def compressedSize: Int = { + val extra = if (_uncompressedSize % BITS_PER_LONG == 0) 0 else 1 + (_uncompressedSize / BITS_PER_LONG + extra) * 8 + 4 + } + } + + class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] { + private val count = ByteBufferHelper.getInt(buffer) + + private var currentWord = 0: Long + + private var visited: Int = 0 + + override def next(row: MutableRow, ordinal: Int): Unit = { + val bit = visited % BITS_PER_LONG + + visited += 1 + if (bit == 0) { + currentWord = ByteBufferHelper.getLong(buffer) + } + + row.setBoolean(ordinal, ((currentWord >> bit) & 1) != 0) + } + + override def hasNext: Boolean = visited < count + } +} + +private[columnar] case object IntDelta extends CompressionScheme { + override def typeId: Int = 4 + + override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { + new Decoder(buffer, INT).asInstanceOf[compression.Decoder[T]] + } + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { + (new Encoder).asInstanceOf[compression.Encoder[T]] + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType == INT + + class Encoder extends compression.Encoder[IntegerType.type] { + protected var _compressedSize: Int = 0 + protected var _uncompressedSize: Int = 0 + + override def compressedSize: Int = _compressedSize + override def uncompressedSize: Int = _uncompressedSize + + private var prevValue: Int = _ + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + val value = row.getInt(ordinal) + val delta = value - prevValue + + _compressedSize += 1 + + // If this is the first integer to be compressed, or the delta is out of byte range, then give + // up compressing this integer. + if (_uncompressedSize == 0 || delta <= Byte.MinValue || delta > Byte.MaxValue) { + _compressedSize += INT.defaultSize + } + + _uncompressedSize += INT.defaultSize + prevValue = value + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + to.putInt(typeId) + + if (from.hasRemaining) { + var prev = from.getInt() + to.put(Byte.MinValue) + to.putInt(prev) + + while (from.hasRemaining) { + val current = from.getInt() + val delta = current - prev + prev = current + + if (Byte.MinValue < delta && delta <= Byte.MaxValue) { + to.put(delta.toByte) + } else { + to.put(Byte.MinValue) + to.putInt(current) + } + } + } + + to.rewind().asInstanceOf[ByteBuffer] + } + } + + class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type]) + extends compression.Decoder[IntegerType.type] { + + private var prev: Int = _ + + override def hasNext: Boolean = buffer.hasRemaining + + override def next(row: MutableRow, ordinal: Int): Unit = { + val delta = buffer.get() + prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer) + row.setInt(ordinal, prev) + } + } +} + +private[columnar] case object LongDelta extends CompressionScheme { + override def typeId: Int = 5 + + override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { + new Decoder(buffer, LONG).asInstanceOf[compression.Decoder[T]] + } + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { + (new Encoder).asInstanceOf[compression.Encoder[T]] + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType == LONG + + class Encoder extends compression.Encoder[LongType.type] { + protected var _compressedSize: Int = 0 + protected var _uncompressedSize: Int = 0 + + override def compressedSize: Int = _compressedSize + override def uncompressedSize: Int = _uncompressedSize + + private var prevValue: Long = _ + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + val value = row.getLong(ordinal) + val delta = value - prevValue + + _compressedSize += 1 + + // If this is the first long integer to be compressed, or the delta is out of byte range, then + // give up compressing this long integer. + if (_uncompressedSize == 0 || delta <= Byte.MinValue || delta > Byte.MaxValue) { + _compressedSize += LONG.defaultSize + } + + _uncompressedSize += LONG.defaultSize + prevValue = value + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + to.putInt(typeId) + + if (from.hasRemaining) { + var prev = from.getLong() + to.put(Byte.MinValue) + to.putLong(prev) + + while (from.hasRemaining) { + val current = from.getLong() + val delta = current - prev + prev = current + + if (Byte.MinValue < delta && delta <= Byte.MaxValue) { + to.put(delta.toByte) + } else { + to.put(Byte.MinValue) + to.putLong(current) + } + } + } + + to.rewind().asInstanceOf[ByteBuffer] + } + } + + class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type]) + extends compression.Decoder[LongType.type] { + + private var prev: Long = _ + + override def hasNext: Boolean = buffer.hasRemaining + + override def next(row: MutableRow, ordinal: Int): Unit = { + val delta = buffer.get() + prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer) + row.setLong(ordinal, prev) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala index 28fa231..c912734 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala @@ -19,5 +19,7 @@ package org.apache.spark.sql /** * The physical execution component of Spark SQL. Note that this is a private package. + * All classes in catalyst are considered an internal API to Spark SQL and are subject + * to change between minor releases. */ package object execution http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index bce94da..d86df4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -27,7 +27,7 @@ import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ import org.apache.spark.Accumulators -import org.apache.spark.sql.columnar._ +import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} import org.apache.spark.storage.{StorageLevel, RDDBlockId} @@ -280,7 +280,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("CACHE TABLE testData") sqlContext.table("testData").queryExecution.withCachedData.collect { case cached: InMemoryRelation => - val actualSizeInBytes = (1 to 100).map(i => INT.defaultSize + i.toString.length + 4).sum + val actualSizeInBytes = (1 to 100).map(i => 4 + i.toString.length + 4).sum assert(cached.statistics.sizeInBytes === actualSizeInBytes) } } http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index b5417b1..6ea1fe4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.InMemoryRelation abstract class QueryTest extends PlanTest { http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala deleted file mode 100644 index 89a6640..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.types._ - -class ColumnStatsSuite extends SparkFunSuite { - testColumnStats(classOf[BooleanColumnStats], BOOLEAN, createRow(true, false, 0)) - testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0)) - testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0)) - testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0)) - testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0)) - testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0)) - testColumnStats(classOf[DoubleColumnStats], DOUBLE, - createRow(Double.MaxValue, Double.MinValue, 0)) - testColumnStats(classOf[StringColumnStats], STRING, createRow(null, null, 0)) - testDecimalColumnStats(createRow(null, null, 0)) - - def createRow(values: Any*): GenericInternalRow = new GenericInternalRow(values.toArray) - - def testColumnStats[T <: AtomicType, U <: ColumnStats]( - columnStatsClass: Class[U], - columnType: NativeColumnType[T], - initialStatistics: GenericInternalRow): Unit = { - - val columnStatsName = columnStatsClass.getSimpleName - - test(s"$columnStatsName: empty") { - val columnStats = columnStatsClass.newInstance() - columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach { - case (actual, expected) => assert(actual === expected) - } - } - - test(s"$columnStatsName: non-empty") { - import org.apache.spark.sql.columnar.ColumnarTestUtils._ - - val columnStats = columnStatsClass.newInstance() - val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1)) - rows.foreach(columnStats.gatherStats(_, 0)) - - val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType]) - val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]] - val stats = columnStats.collectedStatistics - - assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0)) - assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1)) - assertResult(10, "Wrong null count")(stats.values(2)) - assertResult(20, "Wrong row count")(stats.values(3)) - assertResult(stats.values(4), "Wrong size in bytes") { - rows.map { row => - if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0) - }.sum - } - } - } - - def testDecimalColumnStats[T <: AtomicType, U <: ColumnStats]( - initialStatistics: GenericInternalRow): Unit = { - - val columnStatsName = classOf[DecimalColumnStats].getSimpleName - val columnType = COMPACT_DECIMAL(15, 10) - - test(s"$columnStatsName: empty") { - val columnStats = new DecimalColumnStats(15, 10) - columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach { - case (actual, expected) => assert(actual === expected) - } - } - - test(s"$columnStatsName: non-empty") { - import org.apache.spark.sql.columnar.ColumnarTestUtils._ - - val columnStats = new DecimalColumnStats(15, 10) - val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1)) - rows.foreach(columnStats.gatherStats(_, 0)) - - val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType]) - val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]] - val stats = columnStats.collectedStatistics - - assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0)) - assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1)) - assertResult(10, "Wrong null count")(stats.values(2)) - assertResult(20, "Wrong row count")(stats.values(3)) - assertResult(stats.values(4), "Wrong size in bytes") { - rows.map { row => - if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0) - }.sum - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala deleted file mode 100644 index 63bc39b..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteOrder, ByteBuffer} - -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types._ -import org.apache.spark.{Logging, SparkFunSuite} - - -class ColumnTypeSuite extends SparkFunSuite with Logging { - private val DEFAULT_BUFFER_SIZE = 512 - private val MAP_TYPE = MAP(MapType(IntegerType, StringType)) - private val ARRAY_TYPE = ARRAY(ArrayType(IntegerType)) - private val STRUCT_TYPE = STRUCT(StructType(StructField("a", StringType) :: Nil)) - - test("defaultSize") { - val checks = Map( - NULL-> 0, BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, LONG -> 8, - FLOAT -> 4, DOUBLE -> 8, COMPACT_DECIMAL(15, 10) -> 8, LARGE_DECIMAL(20, 10) -> 12, - STRING -> 8, BINARY -> 16, STRUCT_TYPE -> 20, ARRAY_TYPE -> 16, MAP_TYPE -> 32) - - checks.foreach { case (columnType, expectedSize) => - assertResult(expectedSize, s"Wrong defaultSize for $columnType") { - columnType.defaultSize - } - } - } - - test("actualSize") { - def checkActualSize( - columnType: ColumnType[_], - value: Any, - expected: Int): Unit = { - - assertResult(expected, s"Wrong actualSize for $columnType") { - val row = new GenericMutableRow(1) - row.update(0, CatalystTypeConverters.convertToCatalyst(value)) - val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) - columnType.actualSize(proj(row), 0) - } - } - - checkActualSize(NULL, null, 0) - checkActualSize(BOOLEAN, true, 1) - checkActualSize(BYTE, Byte.MaxValue, 1) - checkActualSize(SHORT, Short.MaxValue, 2) - checkActualSize(INT, Int.MaxValue, 4) - checkActualSize(LONG, Long.MaxValue, 8) - checkActualSize(FLOAT, Float.MaxValue, 4) - checkActualSize(DOUBLE, Double.MaxValue, 8) - checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) - checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4) - checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8) - checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5) - checkActualSize(ARRAY_TYPE, Array[Any](1), 16) - checkActualSize(MAP_TYPE, Map(1 -> "a"), 29) - checkActualSize(STRUCT_TYPE, Row("hello"), 28) - } - - testNativeColumnType(BOOLEAN) - testNativeColumnType(BYTE) - testNativeColumnType(SHORT) - testNativeColumnType(INT) - testNativeColumnType(LONG) - testNativeColumnType(FLOAT) - testNativeColumnType(DOUBLE) - testNativeColumnType(COMPACT_DECIMAL(15, 10)) - testNativeColumnType(STRING) - - testColumnType(NULL) - testColumnType(BINARY) - testColumnType(LARGE_DECIMAL(20, 10)) - testColumnType(STRUCT_TYPE) - testColumnType(ARRAY_TYPE) - testColumnType(MAP_TYPE) - - def testNativeColumnType[T <: AtomicType](columnType: NativeColumnType[T]): Unit = { - testColumnType[T#InternalType](columnType) - } - - def testColumnType[JvmType](columnType: ColumnType[JvmType]): Unit = { - - val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ByteOrder.nativeOrder()) - val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) - val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) - val seq = (0 until 4).map(_ => proj(makeRandomRow(columnType)).copy()) - - test(s"$columnType append/extract") { - buffer.rewind() - seq.foreach(columnType.append(_, 0, buffer)) - - buffer.rewind() - seq.foreach { row => - logInfo("buffer = " + buffer + ", expected = " + row) - val expected = converter(row.get(0, columnType.dataType)) - val extracted = converter(columnType.extract(buffer)) - assert(expected === extracted, - s"Extracted value didn't equal to the original one. $expected != $extracted, buffer =" + - dumpBuffer(buffer.duplicate().rewind().asInstanceOf[ByteBuffer])) - } - } - } - - private def dumpBuffer(buff: ByteBuffer): Any = { - val sb = new StringBuilder() - while (buff.hasRemaining) { - val b = buff.get() - sb.append(Integer.toHexString(b & 0xff)).append(' ') - } - if (sb.nonEmpty) sb.setLength(sb.length - 1) - sb.toString() - } - - test("column type for decimal types with different precision") { - (1 to 18).foreach { i => - assertResult(COMPACT_DECIMAL(i, 0)) { - ColumnType(DecimalType(i, 0)) - } - } - - assertResult(LARGE_DECIMAL(19, 0)) { - ColumnType(DecimalType(19, 0)) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala deleted file mode 100644 index a5882f7..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import scala.collection.immutable.HashSet -import scala.util.Random - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow} -import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData} -import org.apache.spark.sql.types.{AtomicType, Decimal} -import org.apache.spark.unsafe.types.UTF8String - -object ColumnarTestUtils { - def makeNullRow(length: Int): GenericMutableRow = { - val row = new GenericMutableRow(length) - (0 until length).foreach(row.setNullAt) - row - } - - def makeRandomValue[JvmType](columnType: ColumnType[JvmType]): JvmType = { - def randomBytes(length: Int) = { - val bytes = new Array[Byte](length) - Random.nextBytes(bytes) - bytes - } - - (columnType match { - case NULL => null - case BOOLEAN => Random.nextBoolean() - case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte - case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort - case INT => Random.nextInt() - case LONG => Random.nextLong() - case FLOAT => Random.nextFloat() - case DOUBLE => Random.nextDouble() - case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32))) - case BINARY => randomBytes(Random.nextInt(32)) - case COMPACT_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale) - case LARGE_DECIMAL(precision, scale) => Decimal(Random.nextLong(), precision, scale) - case STRUCT(_) => - new GenericInternalRow(Array[Any](UTF8String.fromString(Random.nextString(10)))) - case ARRAY(_) => - new GenericArrayData(Array[Any](Random.nextInt(), Random.nextInt())) - case MAP(_) => - ArrayBasedMapData( - Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32))))) - }).asInstanceOf[JvmType] - } - - def makeRandomValues( - head: ColumnType[_], - tail: ColumnType[_]*): Seq[Any] = makeRandomValues(Seq(head) ++ tail) - - def makeRandomValues(columnTypes: Seq[ColumnType[_]]): Seq[Any] = { - columnTypes.map(makeRandomValue(_)) - } - - def makeUniqueRandomValues[JvmType]( - columnType: ColumnType[JvmType], - count: Int): Seq[JvmType] = { - - Iterator.iterate(HashSet.empty[JvmType]) { set => - set + Iterator.continually(makeRandomValue(columnType)).filterNot(set.contains).next() - }.drop(count).next().toSeq - } - - def makeRandomRow( - head: ColumnType[_], - tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail) - - def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = { - val row = new GenericMutableRow(columnTypes.length) - makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) => - row(index) = value - } - row - } - - def makeUniqueValuesAndSingleValueRows[T <: AtomicType]( - columnType: NativeColumnType[T], - count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = { - - val values = makeUniqueRandomValues(columnType, count) - val rows = values.map { value => - val row = new GenericMutableRow(1) - row(0) = value - row - } - - (values, rows) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala deleted file mode 100644 index 6265e40..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.sql.{Date, Timestamp} - -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.test.SQLTestData._ -import org.apache.spark.sql.types._ -import org.apache.spark.storage.StorageLevel.MEMORY_ONLY - -class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { - import testImplicits._ - - setupTestData() - - test("simple columnar query") { - val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) - - checkAnswer(scan, testData.collect().toSeq) - } - - test("default size avoids broadcast") { - // TODO: Improve this test when we have better statistics - sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)) - .toDF().registerTempTable("sizeTst") - sqlContext.cacheTable("sizeTst") - assert( - sqlContext.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes > - sqlContext.conf.autoBroadcastJoinThreshold) - } - - test("projection") { - val plan = sqlContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) - - checkAnswer(scan, testData.collect().map { - case Row(key: Int, value: String) => value -> key - }.map(Row.fromTuple)) - } - - test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { - val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) - - checkAnswer(scan, testData.collect().toSeq) - checkAnswer(scan, testData.collect().toSeq) - } - - test("SPARK-1678 regression: compression must not lose repeated values") { - checkAnswer( - sql("SELECT * FROM repeatedData"), - repeatedData.collect().toSeq.map(Row.fromTuple)) - - sqlContext.cacheTable("repeatedData") - - checkAnswer( - sql("SELECT * FROM repeatedData"), - repeatedData.collect().toSeq.map(Row.fromTuple)) - } - - test("with null values") { - checkAnswer( - sql("SELECT * FROM nullableRepeatedData"), - nullableRepeatedData.collect().toSeq.map(Row.fromTuple)) - - sqlContext.cacheTable("nullableRepeatedData") - - checkAnswer( - sql("SELECT * FROM nullableRepeatedData"), - nullableRepeatedData.collect().toSeq.map(Row.fromTuple)) - } - - test("SPARK-2729 regression: timestamp data type") { - val timestamps = (0 to 3).map(i => Tuple1(new Timestamp(i))).toDF("time") - timestamps.registerTempTable("timestamps") - - checkAnswer( - sql("SELECT time FROM timestamps"), - timestamps.collect().toSeq) - - sqlContext.cacheTable("timestamps") - - checkAnswer( - sql("SELECT time FROM timestamps"), - timestamps.collect().toSeq) - } - - test("SPARK-3320 regression: batched column buffer building should work with empty partitions") { - checkAnswer( - sql("SELECT * FROM withEmptyParts"), - withEmptyParts.collect().toSeq.map(Row.fromTuple)) - - sqlContext.cacheTable("withEmptyParts") - - checkAnswer( - sql("SELECT * FROM withEmptyParts"), - withEmptyParts.collect().toSeq.map(Row.fromTuple)) - } - - test("SPARK-4182 Caching complex types") { - complexData.cache().count() - // Shouldn't throw - complexData.count() - complexData.unpersist() - } - - test("decimal type") { - // Casting is required here because ScalaReflection can't capture decimal precision information. - val df = (1 to 10) - .map(i => Tuple1(Decimal(i, 15, 10))) - .toDF("dec") - .select($"dec" cast DecimalType(15, 10)) - - assert(df.schema.head.dataType === DecimalType(15, 10)) - - df.cache().registerTempTable("test_fixed_decimal") - checkAnswer( - sql("SELECT * FROM test_fixed_decimal"), - (1 to 10).map(i => Row(Decimal(i, 15, 10).toJavaBigDecimal))) - } - - test("test different data types") { - // Create the schema. - val struct = - StructType( - StructField("f1", FloatType, true) :: - StructField("f2", ArrayType(BooleanType), true) :: Nil) - val dataTypes = - Seq(StringType, BinaryType, NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct) - val fields = dataTypes.zipWithIndex.map { case (dataType, index) => - StructField(s"col$index", dataType, true) - } - val allColumns = fields.map(_.name).mkString(",") - val schema = StructType(fields) - - // Create a RDD for the schema - val rdd = - sparkContext.parallelize((1 to 10000), 10).map { i => - Row( - s"str${i}: test cache.", - s"binary${i}: test cache.".getBytes("UTF-8"), - null, - i % 2 == 0, - i.toByte, - i.toShort, - i, - Long.MaxValue - i.toLong, - (i + 0.25).toFloat, - (i + 0.75), - BigDecimal(Long.MaxValue.toString + ".12345"), - new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"), - new Date(i), - new Timestamp(i * 1000000L), - (i to i + 10).toSeq, - (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap, - Row((i - 0.25).toFloat, Seq(true, false, null))) - } - sqlContext.createDataFrame(rdd, schema).registerTempTable("InMemoryCache_different_data_types") - // Cache the table. - sql("cache table InMemoryCache_different_data_types") - // Make sure the table is indeed cached. - sqlContext.table("InMemoryCache_different_data_types").queryExecution.executedPlan - assert( - sqlContext.isCached("InMemoryCache_different_data_types"), - "InMemoryCache_different_data_types should be cached.") - // Issue a query and check the results. - checkAnswer( - sql(s"SELECT DISTINCT ${allColumns} FROM InMemoryCache_different_data_types"), - sqlContext.table("InMemoryCache_different_data_types").collect()) - sqlContext.dropTempTable("InMemoryCache_different_data_types") - } - - test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") { - val df = sqlContext.range(1, 100).selectExpr("id % 10 as id") - .rdd.map(id => Tuple1(s"str_$id")).toDF("i") - val cached = df.cache() - // count triggers the caching action. It should not throw. - cached.count() - - // Make sure, the DataFrame is indeed cached. - assert(sqlContext.cacheManager.lookupCachedData(cached).nonEmpty) - - // Check result. - checkAnswer( - cached, - sqlContext.range(1, 100).selectExpr("id % 10 as id") - .rdd.map(id => Tuple1(s"str_$id")).toDF("i") - ) - - // Drop the cache. - cached.unpersist() - } - - test("SPARK-10859: Predicates pushed to InMemoryColumnarTableScan are not evaluated correctly") { - val data = sqlContext.range(10).selectExpr("id", "cast(id as string) as s") - data.cache() - assert(data.count() === 10) - assert(data.filter($"s" === "3").count() === 1) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala deleted file mode 100644 index aa1605f..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.ByteBuffer - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} -import org.apache.spark.sql.types._ - -class TestNullableColumnAccessor[JvmType]( - buffer: ByteBuffer, - columnType: ColumnType[JvmType]) - extends BasicColumnAccessor(buffer, columnType) - with NullableColumnAccessor - -object TestNullableColumnAccessor { - def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType]) - : TestNullableColumnAccessor[JvmType] = { - new TestNullableColumnAccessor(buffer, columnType) - } -} - -class NullableColumnAccessorSuite extends SparkFunSuite { - import org.apache.spark.sql.columnar.ColumnarTestUtils._ - - Seq( - NULL, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, - STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10), - STRUCT(StructType(StructField("a", StringType) :: Nil)), - ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType))) - .foreach { - testNullableColumnAccessor(_) - } - - def testNullableColumnAccessor[JvmType]( - columnType: ColumnType[JvmType]): Unit = { - - val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - val nullRow = makeNullRow(1) - - test(s"Nullable $typeName column accessor: empty column") { - val builder = TestNullableColumnBuilder(columnType) - val accessor = TestNullableColumnAccessor(builder.build(), columnType) - assert(!accessor.hasNext) - } - - test(s"Nullable $typeName column accessor: access null values") { - val builder = TestNullableColumnBuilder(columnType) - val randomRow = makeRandomRow(columnType) - val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) - - (0 until 4).foreach { _ => - builder.appendFrom(proj(randomRow), 0) - builder.appendFrom(proj(nullRow), 0) - } - - val accessor = TestNullableColumnAccessor(builder.build(), columnType) - val row = new GenericMutableRow(1) - val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) - - (0 until 4).foreach { _ => - assert(accessor.hasNext) - accessor.extractTo(row, 0) - assert(converter(row.get(0, columnType.dataType)) - === converter(randomRow.get(0, columnType.dataType))) - - assert(accessor.hasNext) - accessor.extractTo(row, 0) - assert(row.isNullAt(0)) - } - - assert(!accessor.hasNext) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ea1a51fc/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala deleted file mode 100644 index 9140457..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} -import org.apache.spark.sql.types._ - -class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType]) - extends BasicColumnBuilder[JvmType](new NoopColumnStats, columnType) - with NullableColumnBuilder - -object TestNullableColumnBuilder { - def apply[JvmType](columnType: ColumnType[JvmType], initialSize: Int = 0) - : TestNullableColumnBuilder[JvmType] = { - val builder = new TestNullableColumnBuilder(columnType) - builder.initialize(initialSize) - builder - } -} - -class NullableColumnBuilderSuite extends SparkFunSuite { - import org.apache.spark.sql.columnar.ColumnarTestUtils._ - - Seq( - BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, - STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10), - STRUCT(StructType(StructField("a", StringType) :: Nil)), - ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType))) - .foreach { - testNullableColumnBuilder(_) - } - - def testNullableColumnBuilder[JvmType]( - columnType: ColumnType[JvmType]): Unit = { - - val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - val dataType = columnType.dataType - val proj = UnsafeProjection.create(Array[DataType](dataType)) - val converter = CatalystTypeConverters.createToScalaConverter(dataType) - - test(s"$typeName column builder: empty column") { - val columnBuilder = TestNullableColumnBuilder(columnType) - val buffer = columnBuilder.build() - - assertResult(0, "Wrong null count")(buffer.getInt()) - assert(!buffer.hasRemaining) - } - - test(s"$typeName column builder: buffer size auto growth") { - val columnBuilder = TestNullableColumnBuilder(columnType) - val randomRow = makeRandomRow(columnType) - - (0 until 4).foreach { _ => - columnBuilder.appendFrom(proj(randomRow), 0) - } - - val buffer = columnBuilder.build() - - assertResult(0, "Wrong null count")(buffer.getInt()) - } - - test(s"$typeName column builder: null values") { - val columnBuilder = TestNullableColumnBuilder(columnType) - val randomRow = makeRandomRow(columnType) - val nullRow = makeNullRow(1) - - (0 until 4).foreach { _ => - columnBuilder.appendFrom(proj(randomRow), 0) - columnBuilder.appendFrom(proj(nullRow), 0) - } - - val buffer = columnBuilder.build() - - assertResult(4, "Wrong null count")(buffer.getInt()) - - // For null positions - (1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt())) - - // For non-null values - val actual = new GenericMutableRow(new Array[Any](1)) - (0 until 4).foreach { _ => - columnType.extract(buffer, actual, 0) - assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)), - "Extracted value didn't equal to the original one") - } - - assert(!buffer.hasRemaining) - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
