This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch viz-to-spark in repository https://gitbox.apache.org/repos/asf/sedona.git
commit df7ce237f2ea7e90a8bd176ecd8ba5e1a2507879 Author: Jia Yu <[email protected]> AuthorDate: Wed Sep 6 17:40:57 2023 -0700 Finalize --- .../org/apache/sedona/sql/TestBaseScala.scala | 26 ++- .../org/apache/sedona/viz/rdd/scalaTest.scala | 181 --------------------- .../sql/{TestBaseScala.scala => VizTestBase.scala} | 24 +-- .../apache/sedona/viz/sql/optVizOperatorTest.scala | 10 +- .../sedona/viz/sql/standardVizOperatorTest.scala | 40 ++--- 5 files changed, 43 insertions(+), 238 deletions(-) diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala index 662de6d9e..5113f6ab9 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala @@ -20,11 +20,10 @@ package org.apache.sedona.sql import com.google.common.math.DoubleMath import org.apache.log4j.{Level, Logger} -import org.apache.sedona.common.sphere.{Haversine, Spheroid} import org.apache.sedona.common.Functions.{frechetDistance, hausdorffDistance} +import org.apache.sedona.common.sphere.{Haversine, Spheroid} import org.apache.sedona.spark.SedonaContext -import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.DataFrame import org.locationtech.jts.geom.{CoordinateSequence, CoordinateSequenceComparator} import org.scalatest.{BeforeAndAfterAll, FunSpec} @@ -36,9 +35,15 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll { Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN) val warehouseLocation = System.getProperty("user.dir") + "/target/" - var sparkSession:SparkSession = null + val sparkSession = SedonaContext.builder(). + master("local[*]").appName("sedonasqlScalaTest") + .config("spark.sql.warehouse.dir", warehouseLocation) + // We need to be explicit about broadcasting in tests. + .config("sedona.join.autoBroadcastJoinThreshold", "-1") + .config("spark.kryoserializer.buffer.max", "64m") + .getOrCreate() - var sc: SparkContext = null + val sc = sparkSession.sparkContext val resourceFolder = System.getProperty("user.dir") + "/src/test/resources/" val mixedWkbGeometryInputLocation = resourceFolder + "county_small_wkb.tsv" @@ -69,21 +74,12 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll { val smallRasterDataLocation: String = resourceFolder + "raster/test1.tiff" override def beforeAll(): Unit = { - sparkSession = SedonaContext.builder(). - master("local[*]").appName("sedonasqlScalaTest") - .config("spark.sql.warehouse.dir", warehouseLocation) - // We need to be explicit about broadcasting in tests. - .config("sedona.join.autoBroadcastJoinThreshold", "-1") - .config("spark.kryoserializer.buffer.max", "64m") - .getOrCreate() - - sc = sparkSession.sparkContext SedonaContext.create(sparkSession) } override def afterAll(): Unit = { //SedonaSQLRegistrator.dropAll(spark) - sparkSession.stop +// sparkSession.stop } def loadCsv(path: String): DataFrame = { diff --git a/spark/common/src/test/scala/org/apache/sedona/viz/rdd/scalaTest.scala b/spark/common/src/test/scala/org/apache/sedona/viz/rdd/scalaTest.scala deleted file mode 100644 index 068e97ea3..000000000 --- a/spark/common/src/test/scala/org/apache/sedona/viz/rdd/scalaTest.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sedona.viz.rdd - -import org.apache.log4j.{Level, Logger} -import org.apache.sedona.common.enums.FileDataSplitter -import org.apache.sedona.core.enums.{GridType, IndexType} -import org.apache.sedona.core.formatMapper.EarthdataHDFPointMapper -import org.apache.sedona.core.spatialOperator.JoinQuery -import org.apache.sedona.core.spatialRDD.{PointRDD, PolygonRDD, RectangleRDD} -import org.apache.sedona.viz.`extension`.visualizationEffect.{ChoroplethMap, HeatMap, ScatterPlot} -import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator -import org.apache.sedona.viz.core.{ImageGenerator, RasterOverlayOperator} -import org.apache.sedona.viz.utils.{ColorizeOption, ImageType} -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.{SparkConf, SparkContext} -import org.locationtech.jts.geom.Envelope -import org.scalatest.{BeforeAndAfterAll, FunSpec} - -import java.awt.Color -import java.io.FileInputStream -import java.util.Properties - -class scalaTest extends FunSpec with BeforeAndAfterAll{ - val sparkConf = new SparkConf().setAppName("scalaTest").setMaster("local[*]") - sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) - sparkConf.set("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName) - var sparkContext:SparkContext = _ - Logger.getLogger("org").setLevel(Level.WARN) - Logger.getLogger("akka").setLevel(Level.WARN) - val prop = new Properties() - val resourceFolder = System.getProperty("user.dir") + "/src/test/resources/" - val demoOutputPath = "target/scala/demo" - var ConfFile = new FileInputStream(resourceFolder + "babylon.point.properties") - prop.load(ConfFile) - val scatterPlotOutputPath = System.getProperty("user.dir") + "/" + demoOutputPath + "/scatterplot" - val heatMapOutputPath = System.getProperty("user.dir") + "/" + demoOutputPath + "/heatmap" - val choroplethMapOutputPath = System.getProperty("user.dir") + "/" + demoOutputPath + "/choroplethmap" - val parallelFilterRenderStitchOutputPath = System.getProperty("user.dir") + "/" + demoOutputPath + "/parallelfilterrenderstitchheatmap" - val earthdataScatterPlotOutputPath = System.getProperty("user.dir") + "/" + demoOutputPath + "/earthdatascatterplot" - val PointInputLocation = resourceFolder + prop.getProperty("inputLocation") - val PointOffset = prop.getProperty("offset").toInt - val PointSplitter = FileDataSplitter.getFileDataSplitter(prop.getProperty("splitter")) - val PointNumPartitions = prop.getProperty("numPartitions").toInt - ConfFile = new FileInputStream(resourceFolder + "babylon.rectangle.properties") - prop.load(ConfFile) - val RectangleInputLocation = resourceFolder + prop.getProperty("inputLocation") - val RectangleOffset = prop.getProperty("offset").toInt - val RectangleSplitter = FileDataSplitter.getFileDataSplitter(prop.getProperty("splitter")) - val RectangleNumPartitions = prop.getProperty("numPartitions").toInt - ConfFile = new FileInputStream(resourceFolder + "babylon.polygon.properties") - prop.load(ConfFile) - val PolygonInputLocation = resourceFolder + prop.getProperty("inputLocation") - val PolygonOffset = prop.getProperty("offset").toInt - val PolygonSplitter = FileDataSplitter.getFileDataSplitter(prop.getProperty("splitter")) - val PolygonNumPartitions = prop.getProperty("numPartitions").toInt - ConfFile = new FileInputStream(resourceFolder + "babylon.linestring.properties") - prop.load(ConfFile) - val LineStringInputLocation = resourceFolder + prop.getProperty("inputLocation") - val LineStringOffset = prop.getProperty("offset").toInt - val LineStringSplitter = FileDataSplitter.getFileDataSplitter(prop.getProperty("splitter")) - val LineStringNumPartitions = prop.getProperty("numPartitions").toInt - val USMainLandBoundary = new Envelope(-126.790180, -64.630926, 24.863836, 50.000) - val earthdataInputLocation = resourceFolder + "modis/modis.csv" - val earthdataNumPartitions = 5 - val HDFIncrement = 5 - val HDFOffset = 2 - val HDFRootGroupName = "MOD_Swath_LST" - val HDFDataVariableName = "LST" - val HDFDataVariableList = Array("LST", "QC", "Error_LST", "Emis_31", "Emis_32") - val HDFswitchXY = true - val urlPrefix = resourceFolder + "modis/" - - override def beforeAll(): Unit = { - sparkContext = new SparkContext(sparkConf) - } - override def afterAll(): Unit = { - sparkContext.stop() - } - describe("SedonaViz in Scala") { - - it("should pass scatter plot") { - val spatialRDD = new PolygonRDD(sparkContext, PolygonInputLocation, PolygonSplitter, false, PolygonNumPartitions) - var visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false) - visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true) - visualizationOperator.Visualize(sparkContext, spatialRDD) - var imageGenerator = new ImageGenerator - imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, scatterPlotOutputPath, ImageType.PNG) - /* - visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false, -1, -1, false, true) - visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true) - visualizationOperator.Visualize(sparkContext, spatialRDD) - imageGenerator = new ImageGenerator - imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.vectorImage, scatterPlotOutputPath, ImageType.SVG) - visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false, -1, -1, true, true) - visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true) - visualizationOperator.Visualize(sparkContext, spatialRDD) - imageGenerator = new ImageGenerator - imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.distributedVectorImage, scatterPlotOutputPath + "-distributed", ImageType.SVG) - */ - true - } - - it("should pass heat map") { - val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions) - val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2) - visualizationOperator.Visualize(sparkContext, spatialRDD) - val imageGenerator = new ImageGenerator - imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, heatMapOutputPath, ImageType.PNG) - true - } - - it("should pass choropleth map") { - val spatialRDD = new PointRDD(sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions) - val queryRDD = new PolygonRDD(sparkContext, PolygonInputLocation, PolygonSplitter, false, PolygonNumPartitions) - spatialRDD.analyze(); - spatialRDD.spatialPartitioning(GridType.KDBTREE) - queryRDD.spatialPartitioning(spatialRDD.getPartitioner) - spatialRDD.buildIndex(IndexType.RTREE, true) - val joinResult = JoinQuery.SpatialJoinQueryCountByKey(spatialRDD, queryRDD, true, false) - val visualizationOperator = new ChoroplethMap(1000, 600, USMainLandBoundary, false) - visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.RED, true) - visualizationOperator.Visualize(sparkContext, joinResult) - val frontImage = new ScatterPlot(1000, 600, USMainLandBoundary, false) - frontImage.CustomizeColor(0, 0, 0, 255, Color.GREEN, true) - frontImage.Visualize(sparkContext, queryRDD) - val overlayOperator = new RasterOverlayOperator(visualizationOperator.rasterImage) - overlayOperator.JoinImage(frontImage.rasterImage) - val imageGenerator = new ImageGenerator - imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage, choroplethMapOutputPath, ImageType.PNG) - true - } - - it("should pass parallel filtering and rendering without stitching image tiles") { - val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions) - val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2, 4, 4, true, true) - visualizationOperator.Visualize(sparkContext, spatialRDD) - val imageGenerator = new ImageGenerator - imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, parallelFilterRenderStitchOutputPath, ImageType.PNG, 0, 4, 4) - true - } - - it("should pass parallel filtering and rendering with stitching image tiles") { - val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions) - val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2, 4, 4, true, true) - visualizationOperator.Visualize(sparkContext, spatialRDD) - val imageGenerator = new ImageGenerator - imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, parallelFilterRenderStitchOutputPath, ImageType.PNG) - true - } - - // Tests here have been ignored. A new feature that reads HDF will be added. - ignore("should pass earth data hdf scatter plot") { - val earthdataHDFPoint = new EarthdataHDFPointMapper(HDFIncrement, HDFOffset, HDFRootGroupName, - HDFDataVariableList, HDFDataVariableName, HDFswitchXY, urlPrefix) - val spatialRDD = new PointRDD(sparkContext, earthdataInputLocation, earthdataNumPartitions, earthdataHDFPoint) - val visualizationOperator = new ScatterPlot(1000, 600, spatialRDD.boundaryEnvelope, ColorizeOption.EARTHOBSERVATION, false, false) - visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.BLUE, true) - visualizationOperator.Visualize(sparkContext, spatialRDD) - val imageGenerator = new ImageGenerator - imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, earthdataScatterPlotOutputPath, ImageType.PNG) - true - } - } -} diff --git a/spark/common/src/test/scala/org/apache/sedona/viz/sql/TestBaseScala.scala b/spark/common/src/test/scala/org/apache/sedona/viz/sql/VizTestBase.scala similarity index 68% rename from spark/common/src/test/scala/org/apache/sedona/viz/sql/TestBaseScala.scala rename to spark/common/src/test/scala/org/apache/sedona/viz/sql/VizTestBase.scala index 67bb98346..aeabace48 100644 --- a/spark/common/src/test/scala/org/apache/sedona/viz/sql/TestBaseScala.scala +++ b/spark/common/src/test/scala/org/apache/sedona/viz/sql/VizTestBase.scala @@ -19,46 +19,36 @@ package org.apache.sedona.viz.sql import org.apache.log4j.{Level, Logger} -import org.apache.sedona.spark.SedonaContext +import org.apache.sedona.sql.TestBaseScala import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.scalatest.{BeforeAndAfterAll, FunSpec} +import org.apache.spark.sql.DataFrame -trait TestBaseScala extends FunSpec with BeforeAndAfterAll{ +trait VizTestBase extends TestBaseScala { Logger.getLogger("org.apache").setLevel(Level.WARN) Logger.getLogger("com").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) Logger.getLogger("org.datasyslab").setLevel(Level.WARN) - var spark:SparkSession = _ - val resourceFolder = System.getProperty("user.dir") + "/src/test/resources/" - val polygonInputLocationWkt = resourceFolder + "county_small.tsv" val polygonInputLocation = resourceFolder + "primaryroads-polygon.csv" - val csvPointInputLocation = resourceFolder + "arealm.csv" + val pointInputLocation = resourceFolder + "arealm.csv" override def beforeAll(): Unit = { - spark = SedonaContext.create(SedonaContext.builder(). - master("local[*]").appName("SedonaVizSQL").getOrCreate()) - SedonaVizRegistrator.registerAll(spark) + SedonaVizRegistrator.registerAll(sparkSession) getPoint().createOrReplaceTempView("pointtable") getPolygon().createOrReplaceTempView("usdata") } def getPoint(): DataFrame = { - val pointDf = spark.read.format("csv").option("delimiter", ",").option("header", "false").load(csvPointInputLocation).sample(false, 1) + val pointDf = sparkSession.read.format("csv").option("delimiter", ",").option("header", "false").load(pointInputLocation).sample(false, 1) pointDf.selectExpr("ST_Point(cast(_c0 as Decimal(24,20)),cast(_c1 as Decimal(24,20))) as shape") .filter("ST_Contains(ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000),shape)") } def getPolygon():DataFrame = { - val polygonDf = spark.read.format("csv").option("delimiter", "\t").option("header", "false").load(polygonInputLocationWkt) + val polygonDf = sparkSession.read.format("csv").option("delimiter", "\t").option("header", "false").load(polygonInputLocationWkt) polygonDf.selectExpr("ST_GeomFromWKT(_c0) as shape", "_c1 as rate", "_c2", "_c3") .filter("ST_Contains(ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000),shape)") } - override def afterAll(): Unit = { - spark.stop - } - } diff --git a/spark/common/src/test/scala/org/apache/sedona/viz/sql/optVizOperatorTest.scala b/spark/common/src/test/scala/org/apache/sedona/viz/sql/optVizOperatorTest.scala index 8969a1afa..8c207c70b 100644 --- a/spark/common/src/test/scala/org/apache/sedona/viz/sql/optVizOperatorTest.scala +++ b/spark/common/src/test/scala/org/apache/sedona/viz/sql/optVizOperatorTest.scala @@ -24,12 +24,12 @@ import org.apache.sedona.viz.sql.utils.{Conf, LineageDecoder} import org.apache.spark.sql.functions.lit import org.locationtech.jts.geom.Envelope -class optVizOperatorTest extends TestBaseScala { +class optVizOperatorTest extends VizTestBase { describe("SedonaViz SQL function Test") { it("Passed full pipeline using optimized operator") { - val table = spark.sql( + val table = sparkSession.sql( """ |SELECT pixel, shape FROM pointtable |LATERAL VIEW EXPLODE(ST_Pixelize(shape, 1000, 1000, ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000))) AS pixel @@ -49,7 +49,7 @@ class optVizOperatorTest extends TestBaseScala { // Test the colorize operator result.createOrReplaceTempView("pixelaggregates") - val colorTable = spark.sql( + val colorTable = sparkSession.sql( s""" |SELECT pixel, ${Conf.PrimaryPID}, ${Conf.SecondaryPID}, ST_Colorize(weight, (SELECT max(weight) FROM pixelaggregates)) |FROM pixelaggregates @@ -59,7 +59,7 @@ class optVizOperatorTest extends TestBaseScala { } it("Passed full pipeline - aggregate:avg - color:uniform") { - var table = spark.sql( + var table = sparkSession.sql( """ |SELECT pixel, shape FROM pointtable |LATERAL VIEW EXPLODE(ST_Pixelize(shape, 1000, 1000, ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000))) AS pixel @@ -79,7 +79,7 @@ class optVizOperatorTest extends TestBaseScala { // Test the colorize operator result.createOrReplaceTempView("pixelaggregates") - val colorTable = spark.sql( + val colorTable = sparkSession.sql( s""" |SELECT pixel, ${Conf.PrimaryPID}, ${Conf.SecondaryPID}, ST_Colorize(weight, 0, 'red') |FROM pixelaggregates diff --git a/spark/common/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala b/spark/common/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala index 9d020c89b..06c77fb52 100644 --- a/spark/common/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala +++ b/spark/common/src/test/scala/org/apache/sedona/viz/sql/standardVizOperatorTest.scala @@ -22,31 +22,31 @@ package org.apache.sedona.viz.sql import org.apache.sedona.viz.core.{ImageGenerator, ImageSerializableWrapper} import org.apache.sedona.viz.utils.ImageType -class standardVizOperatorTest extends TestBaseScala { +class standardVizOperatorTest extends VizTestBase { describe("SedonaViz SQL function Test") { it("Generate a single image") { - spark.sql( + sparkSession.sql( """ |SELECT pixel, shape FROM pointtable |LATERAL VIEW EXPLODE(ST_Pixelize(shape, 256, 256, ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000)) ) AS pixel """.stripMargin).createOrReplaceTempView("pixels") - spark.sql( + sparkSession.sql( """ |SELECT pixel, count(*) as weight |FROM pixels |GROUP BY pixel """.stripMargin).createOrReplaceTempView("pixelaggregates") - spark.sql( + sparkSession.sql( """ |SELECT ST_Render(pixel, ST_Colorize(weight, (SELECT max(weight) FROM pixelaggregates), 'red')) AS image |FROM pixelaggregates """.stripMargin).createOrReplaceTempView("images") - var image = spark.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage + var image = sparkSession.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage var imageGenerator = new ImageGenerator imageGenerator.SaveRasterImageAsLocalFile(image, System.getProperty("user.dir") + "/target/points", ImageType.PNG) - val imageString = spark.sql( + val imageString = sparkSession.sql( """ |SELECT ST_EncodeImage(image) |FROM images @@ -55,11 +55,11 @@ class standardVizOperatorTest extends TestBaseScala { } it("Generate a single image using a fat query") { - spark.sql( + sparkSession.sql( """ |SELECT ST_Envelope_Aggr(shape) as bound FROM pointtable """.stripMargin).createOrReplaceTempView("boundtable") - spark.sql( + sparkSession.sql( """ |SELECT pixel, shape FROM pointtable |LATERAL VIEW @@ -71,14 +71,14 @@ class standardVizOperatorTest extends TestBaseScala { | (SELECT ST_Transform(bound, 'epsg:4326','epsg:3857') FROM boundtable) | )) AS pixel """.stripMargin).createOrReplaceTempView("pixels") - spark.sql( + sparkSession.sql( """ |CREATE OR REPLACE TEMP VIEW pixelaggregates AS |SELECT pixel, count(*) as weight |FROM pixels |GROUP BY pixel """.stripMargin) - val images = spark.sql( + val images = sparkSession.sql( """ |SELECT | ST_EncodeImage(ST_Render(pixel, ST_Colorize(weight, (SELECT max(weight) FROM pixelaggregates)))) AS image, @@ -89,36 +89,36 @@ class standardVizOperatorTest extends TestBaseScala { } it("Passed the pipeline on points") { - spark.sql( + sparkSession.sql( """ |SELECT pixel, shape FROM pointtable |LATERAL VIEW EXPLODE(ST_Pixelize(shape, 1000, 800, ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000))) AS pixel """.stripMargin).createOrReplaceTempView("pixels") - spark.sql( + sparkSession.sql( """ |CREATE OR REPLACE TEMP VIEW pixelaggregates AS |SELECT pixel, count(*) as weight |FROM pixels |GROUP BY pixel """.stripMargin) - val pixelaggregates = spark.table("pixelaggregates") + val pixelaggregates = sparkSession.table("pixelaggregates") pixelaggregates.show(1) } it("Passed the pipeline on polygons") { - spark.sql( + sparkSession.sql( """ |SELECT pixel, rate, shape FROM usdata |LATERAL VIEW EXPLODE(ST_Pixelize(shape, 1000, 1000, ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000))) AS pixel """.stripMargin).createOrReplaceTempView("pixels") - spark.sql( + sparkSession.sql( """ |CREATE OR REPLACE TEMP VIEW pixelaggregates AS |SELECT pixel, count(*) as weight |FROM pixels |GROUP BY pixel """.stripMargin) - val imageDf = spark.sql( + val imageDf = sparkSession.sql( """ |SELECT ST_Render(pixel, ST_Colorize(weight, (SELECT max(weight) FROM pixelaggregates))) AS image |FROM pixelaggregates @@ -130,24 +130,24 @@ class standardVizOperatorTest extends TestBaseScala { it("Passed ST_TileName") { var zoomLevel = 2 - spark.sql( + sparkSession.sql( """ |SELECT pixel, shape FROM pointtable |LATERAL VIEW EXPLODE(ST_Pixelize(shape, 1000, 1000, ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000))) AS pixel """.stripMargin).createOrReplaceTempView("pixels") - spark.sql( + sparkSession.sql( """ |CREATE OR REPLACE TEMP VIEW pixelaggregates AS |SELECT pixel, count(*) as weight |FROM pixels |GROUP BY pixel """.stripMargin) - spark.sql( + sparkSession.sql( s""" |SELECT pixel, weight, ST_TileName(pixel, $zoomLevel) AS pid |FROM pixelaggregates """.stripMargin).createOrReplaceTempView("pixel_weights") - val images = spark.sql( + val images = sparkSession.sql( s""" |SELECT ST_Render(pixel, ST_Colorize(weight, (SELECT max(weight) FROM pixelaggregates)), $zoomLevel) AS image |FROM pixel_weights
