Repository: incubator-hivemall Updated Branches: refs/heads/master 79f92f4f2 -> e86c8a0bf
Close #28: [HIVEMALL-30] Temporarily ignore a streaming test in Spark Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/e86c8a0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/e86c8a0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/e86c8a0b Branch: refs/heads/master Commit: e86c8a0bf3c448fa1642b4ad3f938e57d32f524b Parents: 79f92f4 Author: Takeshi YAMAMURO <[email protected]> Authored: Thu Jan 26 18:39:41 2017 +0900 Committer: Takeshi YAMAMURO <[email protected]> Committed: Thu Jan 26 18:39:41 2017 +0900 ---------------------------------------------------------------------- .../streaming/HivemallFeatureOpsSuite.scala | 123 --------------- .../streaming/HivemallOpsWithFeatureSuite.scala | 154 +++++++++++++++++++ 2 files changed, 154 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/e86c8a0b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala deleted file mode 100644 index 6d46292..0000000 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.streaming - -import scala.reflect.ClassTag - -import org.apache.spark.ml.feature.HivemallLabeledPoint -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.hive.HivemallOps._ -import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest -import org.apache.spark.streaming.HivemallStreamingOps._ -import org.apache.spark.streaming.dstream.InputDStream -import org.apache.spark.streaming.scheduler.StreamInputInfo - -/** - * This is an input stream just for tests. - */ -private[this] class TestInputStream[T: ClassTag]( - ssc: StreamingContext, - input: Seq[Seq[T]], - numPartitions: Int) extends InputDStream[T](ssc) { - - override def start() {} - - override def stop() {} - - override def compute(validTime: Time): Option[RDD[T]] = { - logInfo("Computing RDD for time " + validTime) - val index = ((validTime - zeroTime) / slideDuration - 1).toInt - val selectedInput = if (index < input.size) input(index) else Seq[T]() - - // lets us test cases where RDDs are not created - if (selectedInput == null) { - return None - } - - // Report the input data's information to InputInfoTracker for testing - val inputInfo = StreamInputInfo(id, selectedInput.length.toLong) - ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) - - val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) - logInfo("Created RDD " + rdd.id + " with " + selectedInput) - Some(rdd) - } -} - -final class HivemallFeatureOpsSuite extends HivemallFeatureQueryTest { - - // This implicit value used in `HivemallStreamingOps` - implicit val sqlCtx = hiveContext - - /** - * Run a block of code with the given StreamingContext. - * This method do not stop a given SparkContext because other tests share the context. - */ - private def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): Unit = { - try { - block(ssc) - ssc.start() - ssc.awaitTerminationOrTimeout(10 * 1000) // 10s wait - } finally { - try { - ssc.stop(stopSparkContext = false) - } catch { - case e: Exception => logError("Error stopping StreamingContext", e) - } - } - } - - test("streaming") { - import sqlCtx.implicits._ - - // We assume we build a model in advance - val testModel = Seq( - ("0", 0.3f), ("1", 0.1f), ("2", 0.6f), ("3", 0.2f) - ).toDF("feature", "weight") - - withStreamingContext(new StreamingContext(sqlCtx.sparkContext, Milliseconds(100))) { ssc => - val inputData = Seq( - Seq(HivemallLabeledPoint(features = "1:0.6" :: "2:0.1" :: Nil)), - Seq(HivemallLabeledPoint(features = "2:0.9" :: Nil)), - Seq(HivemallLabeledPoint(features = "1:0.2" :: Nil)), - Seq(HivemallLabeledPoint(features = "2:0.1" :: Nil)), - Seq(HivemallLabeledPoint(features = "0:0.6" :: "2:0.4" :: Nil)) - ) - - val inputStream = new TestInputStream[HivemallLabeledPoint](ssc, inputData, 1) - - // Apply predictions on input streams - val prediction = inputStream.predict { streamDf => - val df = streamDf.select(rowid(), $"features").explode_array($"features") - val testDf = df.select( - // TODO: `$"feature"` throws AnalysisException, why? - $"rowid", extract_feature(df("feature")), extract_weight(df("feature")) - ) - testDf.join(testModel, testDf("feature") === testModel("feature"), "LEFT_OUTER") - .select($"rowid", ($"weight" * $"value").as("value")) - .groupBy("rowid").sum("value") - .as("rowid", "value") - .select($"rowid", sigmoid($"value")) - } - - // Dummy output stream - prediction.foreachRDD(_ => {}) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/e86c8a0b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala new file mode 100644 index 0000000..598479d --- /dev/null +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.streaming + +import scala.reflect.ClassTag + +import org.apache.spark.ml.feature.HivemallLabeledPoint +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hive.HivemallOps._ +import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest +import org.apache.spark.streaming.HivemallStreamingOps._ +import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming.scheduler.StreamInputInfo + +/** + * This is an input stream just for tests. + */ +private[this] class TestInputStream[T: ClassTag]( + ssc: StreamingContext, + input: Seq[Seq[T]], + numPartitions: Int) extends InputDStream[T](ssc) { + + override def start() {} + + override def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + logInfo("Computing RDD for time " + validTime) + val index = ((validTime - zeroTime) / slideDuration - 1).toInt + val selectedInput = if (index < input.size) input(index) else Seq[T]() + + // lets us test cases where RDDs are not created + if (selectedInput == null) { + return None + } + + // Report the input data's information to InputInfoTracker for testing + val inputInfo = StreamInputInfo(id, selectedInput.length.toLong) + ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) + + val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) + logInfo("Created RDD " + rdd.id + " with " + selectedInput) + Some(rdd) + } +} + +final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { + + // This implicit value used in `HivemallStreamingOps` + implicit val sqlCtx = hiveContext + + /** + * Run a block of code with the given StreamingContext. + * This method do not stop a given SparkContext because other tests share the context. + */ + private def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): Unit = { + try { + block(ssc) + ssc.start() + ssc.awaitTerminationOrTimeout(10 * 1000) // 10s wait + } finally { + try { + ssc.stop(stopSparkContext = false) + } catch { + case e: Exception => logError("Error stopping StreamingContext", e) + } + } + } + + // scalastyle:off line.size.limit + + /** + * This test below fails sometimes (too flaky), so we temporarily ignore it. + * The stacktrace of this failure is: + * + * HivemallOpsWithFeatureSuite: + * Exception in thread "broadcast-exchange-60" java.lang.OutOfMemoryError: Java heap space + * at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) + * at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) + * at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231) + * at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231) + * at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:78) + * at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:65) + * at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205) + * at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:235) + * at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:175) + * at java.io.ObjectOutputStream$BlockDataOutputStream.close(ObjectOutputStream.java:1827) + * at java.io.ObjectOutputStream.close(ObjectOutputStream.java:741) + * at org.apache.spark.serializer.JavaSerializationStream.close(JavaSerializer.scala:57) + * at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:238) + * at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1296) + * at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237) + * at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107) + * at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:86) + * at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) + * ... + */ + + // scalastyle:on line.size.limit + + ignore("streaming") { + import sqlCtx.implicits._ + + // We assume we build a model in advance + val testModel = Seq( + ("0", 0.3f), ("1", 0.1f), ("2", 0.6f), ("3", 0.2f) + ).toDF("feature", "weight") + + withStreamingContext(new StreamingContext(sqlCtx.sparkContext, Milliseconds(100))) { ssc => + val inputData = Seq( + Seq(HivemallLabeledPoint(features = "1:0.6" :: "2:0.1" :: Nil)), + Seq(HivemallLabeledPoint(features = "2:0.9" :: Nil)), + Seq(HivemallLabeledPoint(features = "1:0.2" :: Nil)), + Seq(HivemallLabeledPoint(features = "2:0.1" :: Nil)), + Seq(HivemallLabeledPoint(features = "0:0.6" :: "2:0.4" :: Nil)) + ) + + val inputStream = new TestInputStream[HivemallLabeledPoint](ssc, inputData, 1) + + // Apply predictions on input streams + val prediction = inputStream.predict { streamDf => + val df = streamDf.select(rowid(), $"features").explode_array($"features") + val testDf = df.select( + // TODO: `$"feature"` throws AnalysisException, why? + $"rowid", extract_feature(df("feature")), extract_weight(df("feature")) + ) + testDf.join(testModel, testDf("feature") === testModel("feature"), "LEFT_OUTER") + .select($"rowid", ($"weight" * $"value").as("value")) + .groupBy("rowid").sum("value") + .as("rowid", "value") + .select($"rowid", sigmoid($"value")) + } + + // Dummy output stream + prediction.foreachRDD(_ => {}) + } + } +}
