GEODE-37 change package name from io.pivotal.geode (for ./geode-spark-connector/src/test/scala/io/pivotal)to org.apache.geode for(to ./geode-spark-connector/src/test/scala/org/apache)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/97658f04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/97658f04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/97658f04 Branch: refs/heads/develop Commit: 97658f046e0c2f7a9f0a7eb366c07a91acb7650f Parents: f530d3a Author: Hitesh Khamesra <[email protected]> Authored: Tue Sep 20 15:44:10 2016 -0700 Committer: Hitesh Khamesra <[email protected]> Committed: Tue Sep 20 16:01:02 2016 -0700 ---------------------------------------------------------------------- .../connector/GeodeFunctionDeployerTest.scala | 58 ----- .../DefaultGeodeConnectionManagerTest.scala | 82 ------ ...tStreamingResultSenderAndCollectorTest.scala | 254 ------------------- .../internal/oql/QueryParserTest.scala | 83 ------ .../connector/GeodeFunctionDeployerTest.scala | 58 +++++ .../DefaultGeodeConnectionManagerTest.scala | 82 ++++++ ...tStreamingResultSenderAndCollectorTest.scala | 254 +++++++++++++++++++ .../internal/oql/QueryParserTest.scala | 83 ++++++ 8 files changed, 477 insertions(+), 477 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala deleted file mode 100644 index 4e45dc2..0000000 --- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/GeodeFunctionDeployerTest.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FunSuite, Matchers} -import org.apache.commons.httpclient.HttpClient -import java.io.File - - -class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar { - val mockHttpClient: HttpClient = mock[HttpClient] - - test("jmx url creation") { - val jmxHostAndPort = "localhost:7070" - val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed" - val gfd = new GeodeFunctionDeployer(mockHttpClient); - val urlString = gfd.constructURLString(jmxHostAndPort) - assert(urlString === expectedUrlString) - } - - test("missing jar file") { - val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" - val gfd = new GeodeFunctionDeployer(mockHttpClient); - intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)} - } - - test("deploy with missing jar") { - val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" - val gfd = new GeodeFunctionDeployer(mockHttpClient); - intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))} - intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))} - } - - test("successful mocked deploy") { - val gfd = new GeodeFunctionDeployer(mockHttpClient); - val jar = new File("README.md"); - assert(gfd.deploy("localhost:7070", jar).contains("Deployed")) - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala deleted file mode 100644 index 798912c..0000000 --- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal - -import io.pivotal.geode.spark.connector.GeodeConnectionConf -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FunSuite, Matchers} - -class DefaultGeodeConnectionManagerTest extends FunSuite with Matchers with MockitoSugar { - - test("DefaultGeodeConnectionFactory get/closeConnection") { - // note: connConf 1-4 share the same set of locators - val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) - val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678))) - val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678))) - val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234))) - val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333))) - - val props: Map[String, String] = Map.empty - val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] - val mockConn1 = mock[DefaultGeodeConnection] - val mockConn2 = mock[DefaultGeodeConnection] - when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1) - when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2) - - assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1) - // note: following 3 lines do not trigger connFactory.newConnection(...) - assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) - assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1) - assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1) - assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2) - - // connFactory.newConnection(...) were invoked only twice - verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props) - verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props) - assert(DefaultGeodeConnectionManager.connections.size == 3) - - DefaultGeodeConnectionManager.closeConnection(connConf1) - assert(DefaultGeodeConnectionManager.connections.size == 1) - DefaultGeodeConnectionManager.closeConnection(connConf5) - assert(DefaultGeodeConnectionManager.connections.isEmpty) - } - - test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") { - val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) - val props: Map[String, String] = Map.empty - val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] - when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException()) - intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) } - verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props) - } - - test("DefaultGeodeConnectionFactory close() w/ non-exist connection") { - val props: Map[String, String] = Map.empty - val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] - val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) - val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678))) - val mockConn1 = mock[DefaultGeodeConnection] - when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1) - assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) - assert(DefaultGeodeConnectionManager.connections.size == 1) - // connection does not exists in the connection manager - DefaultGeodeConnectionManager.closeConnection(connConf2) - assert(DefaultGeodeConnectionManager.connections.size == 1) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala deleted file mode 100644 index c95f1dc..0000000 --- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.geodefunctions - -import org.apache.geode.DataSerializer -import org.apache.geode.cache.execute.{ResultCollector, ResultSender} -import org.apache.geode.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl} -import org.apache.geode.cache.query.types.ObjectType -import org.apache.geode.internal.{Version, ByteArrayDataInput, HeapDataOutputStream} -import org.apache.geode.internal.cache.{CachedDeserializable, CachedDeserializableFactory} -import org.scalatest.{BeforeAndAfter, FunSuite} -import scala.collection.JavaConversions._ -import scala.concurrent.{Await, ExecutionContext, Future} -import ExecutionContext.Implicits.global -import scala.concurrent.duration._ - -class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter { - - /** - * A test ResultSender that connects struct ResultSender and ResultCollector - * Note: this test ResultSender has to copy the data (byte array) since the - * StructStreamingResultSender will reuse the byte array. - */ - class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] { - - var finishedNum = 0 - - override def sendResult(result: Object): Unit = - collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) - - /** exception should be sent via lastResult() */ - override def sendException(throwable: Throwable): Unit = - throw new UnsupportedOperationException("sendException is not supported.") - - override def lastResult(result: Object): Unit = { - collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) - this.synchronized { - finishedNum += 1 - if (finishedNum == num) - collector.endResults() - } - } - } - - /** common variables */ - var collector: StructStreamingResultCollector = _ - var baseSender: LocalResultSender = _ - /** common types */ - val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType] - val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType)) - val OneColType = new StructTypeImpl(Array("value"), Array(objType)) - - before { - collector = new StructStreamingResultCollector - baseSender = new LocalResultSender(collector, 1) - } - - test("transfer simple data") { - verifySimpleTransfer(sendDataType = true) - } - - test("transfer simple data with no type info") { - verifySimpleTransfer(sendDataType = false) - } - - def verifySimpleTransfer(sendDataType: Boolean): Unit = { - val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator - val dataType = if (sendDataType) TwoColType else null - new StructStreamingResultSender(baseSender, dataType , iter).send() - // println("type: " + collector.getResultType.toString) - assert(TwoColType.equals(collector.getResultType)) - val iter2 = collector.getResult - (0 to 9).foreach { i => - assert(iter2.hasNext) - val o = iter2.next() - assert(o.size == 2) - assert(o(0).asInstanceOf[Int] == i) - assert(o(1).asInstanceOf[String] == i.toString * 5) - } - assert(! iter2.hasNext) - } - - - /** - * A test iterator that generate integer data - * @param start the 1st value - * @param n number of integers generated - * @param genExcp generate Exception if true. This is used to test exception handling. - */ - def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = { - new Iterator[Array[Object]] { - val max = if (genExcp) start + n else start + n - 1 - var index: Int = start - 1 - - override def hasNext: Boolean = if (index < max) true else false - - override def next(): Array[Object] = - if (index < (start + n - 1)) { - index += 1 - Array(index.asInstanceOf[Object]) - } else throw new RuntimeException("simulated error") - } - } - - test("transfer data with 0 row") { - new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send() - // println("type: " + collector.getResultType.toString) - assert(collector.getResultType == null) - val iter = collector.getResult - assert(! iter.hasNext) - } - - test("transfer data with 10K rows") { - new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send() - // println("type: " + collector.getResultType.toString) - assert(OneColType.equals(collector.getResultType)) - val iter = collector.getResult - // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) - (1 to 10000).foreach { i => - assert(iter.hasNext) - val o = iter.next() - assert(o.size == 1) - assert(o(0).asInstanceOf[Int] == i) - } - assert(! iter.hasNext) - } - - test("transfer data with 10K rows with 2 sender") { - baseSender = new LocalResultSender(collector, 2) - val total = 300 - val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} - val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()} - Await.result(sender1, 1.seconds) - Await.result(sender2, 1.seconds) - - // println("type: " + collector.getResultType.toString) - assert(OneColType.equals(collector.getResultType)) - val iter = collector.getResult - // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) - val set = scala.collection.mutable.Set[Int]() - (1 to total).foreach { i => - assert(iter.hasNext) - val o = iter.next() - assert(o.size == 1) - assert(! set.contains(o(0).asInstanceOf[Int])) - set.add(o(0).asInstanceOf[Int]) - } - assert(! iter.hasNext) - } - - test("transfer data with 10K rows with 2 sender with error") { - baseSender = new LocalResultSender(collector, 2) - val total = 1000 - val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} - val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()} - Await.result(sender1, 1 seconds) - Await.result(sender2, 1 seconds) - - // println("type: " + collector.getResultType.toString) - assert(OneColType.equals(collector.getResultType)) - val iter = collector.getResult - // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) - val set = scala.collection.mutable.Set[Int]() - intercept[RuntimeException] { - (1 to total).foreach { i => - assert(iter.hasNext) - val o = iter.next() - assert(o.size == 1) - assert(! set.contains(o(0).asInstanceOf[Int])) - set.add(o(0).asInstanceOf[Int]) - } - } - // println(s"rows received: ${set.size}") - } - - test("transfer data with Exception") { - new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send() - // println("type: " + collector.getResultType.toString) - val iter = collector.getResult - intercept[RuntimeException] ( iter.foreach(_.mkString(",")) ) - } - - def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] = - intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}")) - - test("transfer string pair data with 200 rows") { - new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send() - // println("type: " + collector.getResultType.toString) - assert(TwoColType.equals(collector.getResultType)) - val iter = collector.getResult - // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) - (1 to 1000).foreach { i => - assert(iter.hasNext) - val o = iter.next() - assert(o.size == 2) - assert(o(0) == s"key-$i") - assert(o(1) == s"value-$i") - } - assert(! iter.hasNext) - } - - /** - * Usage notes: There are 3 kinds of data to transfer: - * (1) object, (2) byte array of serialized object, and (3) byte array - * this test shows how to handle all of them. - */ - test("DataSerializer usage") { - val outBuf = new HeapDataOutputStream(1024, null) - val inBuf = new ByteArrayDataInput() - - // 1. a regular object - val hello = "Hello World!" * 30 - // serialize the data - DataSerializer.writeObject(hello, outBuf) - val bytesHello = outBuf.toByteArray.clone() - // de-serialize the data - inBuf.initialize(bytesHello, Version.CURRENT) - val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object] - assert(hello == hello2) - - // 2. byte array of serialized object - // serialize: byte array from `CachedDeserializable` - val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello) - outBuf.reset() - DataSerializer.writeByteArray(cd.getSerializedValue, outBuf) - // de-serialize the data in 2 steps - inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) - val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf) - inBuf.initialize(bytesHello2, Version.CURRENT) - val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object] - assert(hello == hello3) - - // 3. byte array - outBuf.reset() - DataSerializer.writeByteArray(bytesHello, outBuf) - inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) - val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf) - assert(bytesHello sameElements bytesHello3) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala deleted file mode 100644 index 54394e8..0000000 --- a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.oql - -import org.scalatest.FunSuite - -class QueryParserTest extends FunSuite { - - test("select * from /r1") { - val r = QueryParser.parseOQL("select * from /r1").get - assert(r == "List(/r1)") - } - - test("select c2 from /r1") { - val r = QueryParser.parseOQL("select c2 from /r1").get - assert(r == "List(/r1)") - } - - test("select key, value from /r1.entries") { - val r = QueryParser.parseOQL("select key, value from /r1.entries").get - assert(r == "List(/r1.entries)") - } - - test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") { - val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get - assert(r == "List(/r1)") - } - - test("select * from /r1/r2 where c1 >= 200") { - val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get - assert(r == "List(/r1/r2)") - } - - test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") { - val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get - assert(r == "List(/r1/r2, /r3/r4)") - } - - test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") { - val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get - assert(r == "List(/r1/r2)") - } - - test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") { - val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get - assert(r == "List(/root/sub.entries)") - } - - test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") { - val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get - assert(r == "List(/region)") - } - - test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") { - val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get - assert(r == "List(/QueryRegion1, /QueryRegion2)") - } - - test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") { - val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get - println("r.type=" + r.getClass.getName + " r=" + r) - assert(r == "List(/obj_obj_region)") - } - - test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") { - val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get - assert(r == "List(/obj_obj_region, r.positions.values)") - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala new file mode 100644 index 0000000..4e45dc2 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/GeodeFunctionDeployerTest.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSuite, Matchers} +import org.apache.commons.httpclient.HttpClient +import java.io.File + + +class GeodeFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar { + val mockHttpClient: HttpClient = mock[HttpClient] + + test("jmx url creation") { + val jmxHostAndPort = "localhost:7070" + val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed" + val gfd = new GeodeFunctionDeployer(mockHttpClient); + val urlString = gfd.constructURLString(jmxHostAndPort) + assert(urlString === expectedUrlString) + } + + test("missing jar file") { + val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" + val gfd = new GeodeFunctionDeployer(mockHttpClient); + intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)} + } + + test("deploy with missing jar") { + val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" + val gfd = new GeodeFunctionDeployer(mockHttpClient); + intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))} + intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))} + } + + test("successful mocked deploy") { + val gfd = new GeodeFunctionDeployer(mockHttpClient); + val jar = new File("README.md"); + assert(gfd.deploy("localhost:7070", jar).contains("Deployed")) + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala new file mode 100644 index 0000000..798912c --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal + +import io.pivotal.geode.spark.connector.GeodeConnectionConf +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSuite, Matchers} + +class DefaultGeodeConnectionManagerTest extends FunSuite with Matchers with MockitoSugar { + + test("DefaultGeodeConnectionFactory get/closeConnection") { + // note: connConf 1-4 share the same set of locators + val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) + val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678))) + val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678))) + val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234))) + val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333))) + + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] + val mockConn1 = mock[DefaultGeodeConnection] + val mockConn2 = mock[DefaultGeodeConnection] + when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1) + when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2) + + assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1) + // note: following 3 lines do not trigger connFactory.newConnection(...) + assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) + assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1) + assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1) + assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2) + + // connFactory.newConnection(...) were invoked only twice + verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props) + verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props) + assert(DefaultGeodeConnectionManager.connections.size == 3) + + DefaultGeodeConnectionManager.closeConnection(connConf1) + assert(DefaultGeodeConnectionManager.connections.size == 1) + DefaultGeodeConnectionManager.closeConnection(connConf5) + assert(DefaultGeodeConnectionManager.connections.isEmpty) + } + + test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") { + val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] + when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException()) + intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) } + verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props) + } + + test("DefaultGeodeConnectionFactory close() w/ non-exist connection") { + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] + val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) + val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678))) + val mockConn1 = mock[DefaultGeodeConnection] + when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1) + assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) + assert(DefaultGeodeConnectionManager.connections.size == 1) + // connection does not exists in the connection manager + DefaultGeodeConnectionManager.closeConnection(connConf2) + assert(DefaultGeodeConnectionManager.connections.size == 1) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala new file mode 100644 index 0000000..c95f1dc --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.geodefunctions + +import org.apache.geode.DataSerializer +import org.apache.geode.cache.execute.{ResultCollector, ResultSender} +import org.apache.geode.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl} +import org.apache.geode.cache.query.types.ObjectType +import org.apache.geode.internal.{Version, ByteArrayDataInput, HeapDataOutputStream} +import org.apache.geode.internal.cache.{CachedDeserializable, CachedDeserializableFactory} +import org.scalatest.{BeforeAndAfter, FunSuite} +import scala.collection.JavaConversions._ +import scala.concurrent.{Await, ExecutionContext, Future} +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter { + + /** + * A test ResultSender that connects struct ResultSender and ResultCollector + * Note: this test ResultSender has to copy the data (byte array) since the + * StructStreamingResultSender will reuse the byte array. + */ + class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] { + + var finishedNum = 0 + + override def sendResult(result: Object): Unit = + collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) + + /** exception should be sent via lastResult() */ + override def sendException(throwable: Throwable): Unit = + throw new UnsupportedOperationException("sendException is not supported.") + + override def lastResult(result: Object): Unit = { + collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) + this.synchronized { + finishedNum += 1 + if (finishedNum == num) + collector.endResults() + } + } + } + + /** common variables */ + var collector: StructStreamingResultCollector = _ + var baseSender: LocalResultSender = _ + /** common types */ + val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType] + val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType)) + val OneColType = new StructTypeImpl(Array("value"), Array(objType)) + + before { + collector = new StructStreamingResultCollector + baseSender = new LocalResultSender(collector, 1) + } + + test("transfer simple data") { + verifySimpleTransfer(sendDataType = true) + } + + test("transfer simple data with no type info") { + verifySimpleTransfer(sendDataType = false) + } + + def verifySimpleTransfer(sendDataType: Boolean): Unit = { + val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator + val dataType = if (sendDataType) TwoColType else null + new StructStreamingResultSender(baseSender, dataType , iter).send() + // println("type: " + collector.getResultType.toString) + assert(TwoColType.equals(collector.getResultType)) + val iter2 = collector.getResult + (0 to 9).foreach { i => + assert(iter2.hasNext) + val o = iter2.next() + assert(o.size == 2) + assert(o(0).asInstanceOf[Int] == i) + assert(o(1).asInstanceOf[String] == i.toString * 5) + } + assert(! iter2.hasNext) + } + + + /** + * A test iterator that generate integer data + * @param start the 1st value + * @param n number of integers generated + * @param genExcp generate Exception if true. This is used to test exception handling. + */ + def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = { + new Iterator[Array[Object]] { + val max = if (genExcp) start + n else start + n - 1 + var index: Int = start - 1 + + override def hasNext: Boolean = if (index < max) true else false + + override def next(): Array[Object] = + if (index < (start + n - 1)) { + index += 1 + Array(index.asInstanceOf[Object]) + } else throw new RuntimeException("simulated error") + } + } + + test("transfer data with 0 row") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(collector.getResultType == null) + val iter = collector.getResult + assert(! iter.hasNext) + } + + test("transfer data with 10K rows") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + (1 to 10000).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(o(0).asInstanceOf[Int] == i) + } + assert(! iter.hasNext) + } + + test("transfer data with 10K rows with 2 sender") { + baseSender = new LocalResultSender(collector, 2) + val total = 300 + val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} + val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()} + Await.result(sender1, 1.seconds) + Await.result(sender2, 1.seconds) + + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + val set = scala.collection.mutable.Set[Int]() + (1 to total).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(! set.contains(o(0).asInstanceOf[Int])) + set.add(o(0).asInstanceOf[Int]) + } + assert(! iter.hasNext) + } + + test("transfer data with 10K rows with 2 sender with error") { + baseSender = new LocalResultSender(collector, 2) + val total = 1000 + val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} + val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()} + Await.result(sender1, 1 seconds) + Await.result(sender2, 1 seconds) + + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + val set = scala.collection.mutable.Set[Int]() + intercept[RuntimeException] { + (1 to total).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(! set.contains(o(0).asInstanceOf[Int])) + set.add(o(0).asInstanceOf[Int]) + } + } + // println(s"rows received: ${set.size}") + } + + test("transfer data with Exception") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send() + // println("type: " + collector.getResultType.toString) + val iter = collector.getResult + intercept[RuntimeException] ( iter.foreach(_.mkString(",")) ) + } + + def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] = + intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}")) + + test("transfer string pair data with 200 rows") { + new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(TwoColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + (1 to 1000).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 2) + assert(o(0) == s"key-$i") + assert(o(1) == s"value-$i") + } + assert(! iter.hasNext) + } + + /** + * Usage notes: There are 3 kinds of data to transfer: + * (1) object, (2) byte array of serialized object, and (3) byte array + * this test shows how to handle all of them. + */ + test("DataSerializer usage") { + val outBuf = new HeapDataOutputStream(1024, null) + val inBuf = new ByteArrayDataInput() + + // 1. a regular object + val hello = "Hello World!" * 30 + // serialize the data + DataSerializer.writeObject(hello, outBuf) + val bytesHello = outBuf.toByteArray.clone() + // de-serialize the data + inBuf.initialize(bytesHello, Version.CURRENT) + val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object] + assert(hello == hello2) + + // 2. byte array of serialized object + // serialize: byte array from `CachedDeserializable` + val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello) + outBuf.reset() + DataSerializer.writeByteArray(cd.getSerializedValue, outBuf) + // de-serialize the data in 2 steps + inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) + val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf) + inBuf.initialize(bytesHello2, Version.CURRENT) + val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object] + assert(hello == hello3) + + // 3. byte array + outBuf.reset() + DataSerializer.writeByteArray(bytesHello, outBuf) + inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) + val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf) + assert(bytesHello sameElements bytesHello3) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/97658f04/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala new file mode 100644 index 0000000..54394e8 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/org/apache/geode/spark/connector/internal/oql/QueryParserTest.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import org.scalatest.FunSuite + +class QueryParserTest extends FunSuite { + + test("select * from /r1") { + val r = QueryParser.parseOQL("select * from /r1").get + assert(r == "List(/r1)") + } + + test("select c2 from /r1") { + val r = QueryParser.parseOQL("select c2 from /r1").get + assert(r == "List(/r1)") + } + + test("select key, value from /r1.entries") { + val r = QueryParser.parseOQL("select key, value from /r1.entries").get + assert(r == "List(/r1.entries)") + } + + test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") { + val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get + assert(r == "List(/r1)") + } + + test("select * from /r1/r2 where c1 >= 200") { + val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get + assert(r == "List(/r1/r2)") + } + + test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") { + val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get + assert(r == "List(/r1/r2, /r3/r4)") + } + + test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") { + val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get + assert(r == "List(/r1/r2)") + } + + test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") { + val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get + assert(r == "List(/root/sub.entries)") + } + + test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") { + val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get + assert(r == "List(/region)") + } + + test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") { + val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get + assert(r == "List(/QueryRegion1, /QueryRegion2)") + } + + test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") { + val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get + println("r.type=" + r.getClass.getName + " r=" + r) + assert(r == "List(/obj_obj_region)") + } + + test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") { + val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get + assert(r == "List(/obj_obj_region, r.positions.values)") + } +}
