Repository: ignite Updated Branches: refs/heads/master b583fb3ee -> f2b390a38
IGNITE-3175 BigDecimal fields are not supported if query is executed from IgniteRDD Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f2b390a3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f2b390a3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f2b390a3 Branch: refs/heads/master Commit: f2b390a385821c4235bd20cc31de978b14601077 Parents: b583fb3 Author: sboikov <[email protected]> Authored: Mon May 23 07:28:55 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon May 23 07:28:55 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spark/IgniteRDD.scala | 1 + .../spark/JavaStandaloneIgniteRDDSelfTest.java | 60 +++++++++++++++++++- .../ignite/spark/EntityTestAllTypeFields.scala | 57 +++++++++++++++++++ 3 files changed, 116 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f2b390a3/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 2146acb..9112040 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -220,6 +220,7 @@ class IgniteRDD[K, V] ( case "java.lang.Long" â LongType case "java.lang.Float" â FloatType case "java.lang.Double" â DoubleType + case "java.math.BigDecimal" â DecimalType.SYSTEM_DEFAULT case "java.lang.String" â StringType case "java.util.Date" â DateType case "java.sql.Timestamp" â TimestampType http://git-wip-us.apache.org/repos/asf/ignite/blob/f2b390a3/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java index faa8fda..e600c6c 100644 --- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.spark; -import java.util.List; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; @@ -28,6 +27,7 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -39,6 +39,10 @@ import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import scala.Tuple2; +import java.lang.reflect.Field; +import java.math.BigDecimal; +import java.util.List; + /** * Tests for {@link JavaIgniteRDD} (standalone mode). */ @@ -85,6 +89,14 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest { } }; + /** */ + private static final PairFunction<Integer, String, EntityTestAllTypeFields> INT_TO_ENTITY_ALL_FIELDS_F = + new PairFunction<Integer, String, EntityTestAllTypeFields>() { + @Override public Tuple2<String, EntityTestAllTypeFields> call(Integer i) throws Exception { + return new Tuple2<>(String.valueOf(i), new EntityTestAllTypeFields(i)); + } + }; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll(); @@ -234,7 +246,50 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest { finally { sc.stop(); } + } + /** + * @throws Exception If failed. + */ + public void testAllFieldsTypes() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + final int cnt = 100; + + try { + JavaIgniteContext<String, EntityTestAllTypeFields> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + JavaIgniteRDD<String, EntityTestAllTypeFields> cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, cnt), 2).mapToPair(INT_TO_ENTITY_ALL_FIELDS_F)); + + EntityTestAllTypeFields e = new EntityTestAllTypeFields(cnt / 2); + for(Field f : EntityTestAllTypeFields.class.getDeclaredFields()) { + String fieldName = f.getName(); + + Object val = GridTestUtils.getFieldValue(e, fieldName); + + DataFrame df = cache.sql( + String.format("select %s from EntityTestAllTypeFields where %s = ?", fieldName, fieldName), + val); + + if (val instanceof BigDecimal) { + Object res = df.collect()[0].get(0); + + assertTrue(String.format("+++ Fail on %s field", fieldName), + ((Comparable<BigDecimal>)val).compareTo((BigDecimal)res) == 0); + } + else if (val.getClass().isArray()) + assertTrue(String.format("+++ Fail on %s field", fieldName), 1 <= df.count()); + else + assertEquals(String.format("+++ Fail on %s field", fieldName), val, df.collect()[0].get(0)); + + info(String.format("+++ Query on the filed: %s : %s passed", fieldName, f.getType().getSimpleName())); + } + } + finally { + sc.stop(); + } } /** @@ -269,7 +324,8 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest { ccfg.setName(PARTITIONED_CACHE_NAME); - ccfg.setIndexedTypes(String.class, Entity.class); + ccfg.setIndexedTypes(String.class, Entity.class, + String.class, EntityTestAllTypeFields.class); return ccfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f2b390a3/modules/spark/src/test/scala/org/apache/ignite/spark/EntityTestAllTypeFields.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/EntityTestAllTypeFields.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/EntityTestAllTypeFields.scala new file mode 100644 index 0000000..a936091 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/EntityTestAllTypeFields.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import java.lang.Boolean +import java.sql.Timestamp +import java.util.Date + +import org.apache.ignite.spark.IgniteRDDSpec.ScalarCacheQuerySqlField + +class EntityTestAllTypeFields( + @ScalarCacheQuerySqlField(index = true) val boolVal: Boolean, + @ScalarCacheQuerySqlField(index = true) val byteVal: Byte, + @ScalarCacheQuerySqlField(index = true) val shortVal: Short, + @ScalarCacheQuerySqlField(index = true) val intVal: Int, + @ScalarCacheQuerySqlField(index = true) val longVal: Long, + @ScalarCacheQuerySqlField(index = true) val floatVal: Float, + @ScalarCacheQuerySqlField(index = true) val doubleVal: Double, + @ScalarCacheQuerySqlField(index = true) val strVal: String, + @ScalarCacheQuerySqlField(index = true) val dateVal: Date, + @ScalarCacheQuerySqlField(index = true) val timestampVal: Timestamp, + @ScalarCacheQuerySqlField(index = true) val byteArrVal: Array[Byte], + @ScalarCacheQuerySqlField(index = true) val bigDecVal: java.math.BigDecimal +) extends Serializable { + def this( + i: Int + ) { + this( + i % 2 == 0, // Boolean + i.toByte, // Byte + i.toShort, // Short + i, // Int + i.toLong, // Long + i, // Float + i, // Double + "name" + i, // String + new Date(i), + new Timestamp(i), + Array(i.toByte, i.toByte), + new java.math.BigDecimal(i.toString)) + } +}
