This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new e5b1dd0 [CARBONDATA-4167][CARBONDATA-4168] Fix case sensitive issues
and input validation for Geo values.
e5b1dd0 is described below
commit e5b1dd06e77d89c1c0e77e471f12058f4fdca0a3
Author: ShreelekhyaG <[email protected]>
AuthorDate: Tue Apr 6 14:12:40 2021 +0530
[CARBONDATA-4167][CARBONDATA-4168] Fix case sensitive issues and input
validation for Geo values.
Why is this PR needed?
1. SPATIAL_INDEX property, POLYGON, LINESTRING, and RANGELIST UDF's are
case sensitive.
2. SPATIAL_INDEX.xx.gridSize and SPATIAL_INDEX.xxx.conversionRatio is
accepting negative values.
3. Accepting invalid values in geo UDF's.
What changes were proposed in this PR?
1. converted properties to lower case and made UDF's case insensitive.
2. added validation.
3. refactored readAllIIndexOfSegment
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4118
---
.../blockletindex/SegmentIndexFileStore.java | 28 ++--------
.../core/writer/CarbonIndexFileMergeWriter.java | 3 +-
.../org/apache/carbondata/geo/GeoConstants.java | 5 ++
.../org/apache/carbondata/geo/GeoHashIndex.java | 12 +----
.../org/apache/carbondata/geo/GeoHashUtils.java | 38 ++++++++++---
.../geo/scan/expression/PolygonListExpression.java | 10 +++-
.../expression/PolygonRangeListExpression.java | 10 +++-
.../scan/expression/PolylineListExpression.java | 5 +-
.../org/apache/carbondata/geo/GeoUtilUDFs.scala | 29 +++++-----
.../org/apache/carbondata/geo/InPolygonUDF.scala | 4 +-
.../table/CarbonDescribeFormattedCommand.scala | 12 +++++
.../execution/strategy/CarbonSourceStrategy.scala | 41 +++++++++++++-
.../apache/spark/sql/optimizer/CarbonFilters.scala | 7 +++
.../spark/sql/parser/CarbonSparkSqlParser.scala | 9 +++-
.../sql/parser/CarbonSparkSqlParserUtil.scala | 2 +-
.../scala/org/apache/carbondata/geo/GeoTest.scala | 62 ++++++++++++++++++++--
16 files changed, 208 insertions(+), 69 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 60c8981..3c41ea9 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -197,12 +197,11 @@ public class SegmentIndexFileStore {
* @throws IOException
*/
public void readAllIIndexOfSegment(CarbonFile[] carbonFiles) throws
IOException {
- CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(carbonFiles);
- for (CarbonFile carbonIndexFile : carbonIndexFiles) {
- if
(carbonIndexFile.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- readMergeFile(carbonIndexFile.getCanonicalPath());
- } else if
(carbonIndexFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
- readIndexFile(carbonIndexFile);
+ for (CarbonFile carbonFile : carbonFiles) {
+ if (carbonFile.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT))
{
+ readMergeFile(carbonFile.getCanonicalPath());
+ } else if
(carbonFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ readIndexFile(carbonFile);
}
}
}
@@ -391,23 +390,6 @@ public class SegmentIndexFileStore {
}
/**
- * List all the index files of the segment.
- *
- * @param carbonFiles
- * @return
- */
- public static CarbonFile[] getCarbonIndexFiles(CarbonFile[] carbonFiles) {
- List<CarbonFile> indexFiles = new ArrayList<>();
- for (CarbonFile file: carbonFiles) {
- if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
- file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- indexFiles.add(file);
- }
- }
- return indexFiles.toArray(new CarbonFile[indexFiles.size()]);
- }
-
- /**
* Return the map that contain index file name and content of the file.
*
* @return
diff --git
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index a619f15..2c455d6 100644
---
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -200,13 +200,12 @@ public class CarbonIndexFileMergeWriter {
if (readBasedOnUUID) {
indexFiles = SegmentIndexFileStore
.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(),
uuid);
- fileStore.readAllIIndexOfSegment(segmentPath, uuid);
} else {
// The uuid can be different, when we add load from external path.
indexFiles =
SegmentIndexFileStore.getCarbonIndexFiles(segmentPath,
FileFactory.getConfiguration());
- fileStore.readAllIIndexOfSegment(segmentPath);
}
+ fileStore.readAllIIndexOfSegment(indexFiles);
}
Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
Map<String, List<String>> mergeToIndexFileMap =
fileStore.getCarbonMergeFileToIndexFilesMap();
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
index 2b1b6b7..0520f1e 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
@@ -27,6 +27,9 @@ public class GeoConstants {
// GeoHash type Spatial Index
public static final String GEOHASH = "geohash";
+ // Regular expression to validate whether the input value is positive integer
+ public static final String POSITIVE_INTEGER_REGEX = "^[+]?\\d*[1-9]\\d*$";
+
// Regular expression to parse input polygons for IN_POLYGON_LIST
public static final String POLYGON_REG_EXPRESSION = "(?<=POLYGON
\\(\\()(.*?)(?=(\\)\\)))";
@@ -36,6 +39,8 @@ public class GeoConstants {
// Regular expression to parse input rangelists for IN_POLYGON_RANGE_LIST
public static final String RANGELIST_REG_EXPRESSION = "(?<=RANGELIST
\\()(.*?)(?=\\))";
+ public static final String GRID_SIZE = "gridSize";
+
// delimiter of input points or ranges
public static final String DEFAULT_DELIMITER = ",";
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
b/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
index 4a5e892..b022cba 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashIndex.java
@@ -136,18 +136,10 @@ public class GeoHashIndex extends
CustomIndex<List<Long[]>> {
}
String GRID_SIZE = commonKey + "gridsize";
String gridSize = properties.get(GRID_SIZE);
- if (StringUtils.isEmpty(gridSize)) {
- throw new MalformedCarbonCommandException(
- String.format("%s property is invalid. %s property must be
specified.",
- CarbonCommonConstants.SPATIAL_INDEX, GRID_SIZE));
- }
+ GeoHashUtils.validateGeoProperty(gridSize, GRID_SIZE);
String CONVERSION_RATIO = commonKey + "conversionratio";
String conversionRatio = properties.get(CONVERSION_RATIO);
- if (StringUtils.isEmpty(conversionRatio)) {
- throw new MalformedCarbonCommandException(
- String.format("%s property is invalid. %s property must be
specified.",
- CarbonCommonConstants.SPATIAL_INDEX, CONVERSION_RATIO));
- }
+ GeoHashUtils.validateGeoProperty(conversionRatio, CONVERSION_RATIO);
// Fill the values to the instance fields
this.oriLatitude = Double.valueOf(originLatitude);
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
b/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
index e5e7a78..09baf5c 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
@@ -23,6 +23,14 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
+import java.util.regex.Pattern;
+
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+import static org.apache.carbondata.geo.GeoConstants.POSITIVE_INTEGER_REGEX;
+
+import org.apache.commons.lang3.StringUtils;
public class GeoHashUtils {
@@ -83,6 +91,28 @@ public class GeoHashUtils {
return colRow2GeoID(ij[0], ij[1]);
}
+ public static void validateUDFInputValue(Object input, String inputName,
String datatype)
+ throws MalformedCarbonCommandException {
+ if (inputName.equalsIgnoreCase(GeoConstants.GRID_SIZE) && (input == null
+ ||
!Pattern.compile(POSITIVE_INTEGER_REGEX).matcher(input.toString()).find())) {
+ throw new MalformedCarbonCommandException("Expect grid size to be a
positive integer");
+ } else if (input == null || input.toString().equals("null")) {
+ throw new MalformedCarbonCommandException(
+ "Expect " + inputName + " to be of " + datatype + " type");
+ }
+ }
+
+ public static void validateGeoProperty(String propertyValue, String
propertyName)
+ throws MalformedCarbonCommandException {
+ if (StringUtils.isEmpty(propertyValue) ||
+
!Pattern.compile(POSITIVE_INTEGER_REGEX).matcher(propertyValue).find()) {
+ throw new MalformedCarbonCommandException(
+ String.format("%s property is invalid. %s property must be
specified, "
+ + "and the value must be positive integer.",
+ CarbonCommonConstants.SPATIAL_INDEX, propertyName));
+ }
+ }
+
/**
* Calculate geo id through grid index coordinates, the row and column of
grid coordinates
* can be transformed by latitude and longitude
@@ -264,12 +294,8 @@ public class GeoHashUtils {
* @return geoId range list of processed set
*/
public static List<Long[]> processRangeList(List<Long[]> rangeListA,
List<Long[]> rangeListB,
- String opType) {
+ GeoOperationType operationType) {
List<Long[]> processedRangeList;
- GeoOperationType operationType = GeoOperationType.getEnum(opType);
- if (operationType == null) {
- throw new RuntimeException("Unsupported operation type " + opType);
- }
switch (operationType) {
case OR:
processedRangeList = getPolygonUnion(rangeListA, rangeListB);
@@ -278,7 +304,7 @@ public class GeoHashUtils {
processedRangeList = getPolygonIntersection(rangeListA, rangeListB);
break;
default:
- throw new RuntimeException("Unsupported operation type " + opType);
+ throw new RuntimeException("Unsupported operation type " +
operationType.toString());
}
return processedRangeList;
}
diff --git
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
index 903b694..3250b38 100644
---
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
+++
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
@@ -26,6 +26,7 @@ import
org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.util.CustomIndex;
import org.apache.carbondata.geo.GeoConstants;
import org.apache.carbondata.geo.GeoHashUtils;
+import org.apache.carbondata.geo.GeoOperationType;
/**
* InPolygonList expression processor. It inputs the InPolygonList string to
the Geo
@@ -49,7 +50,8 @@ public class PolygonListExpression extends PolygonExpression {
try {
// 1. parse the polygon list string
List<String> polygons = new ArrayList<>();
- Pattern pattern = Pattern.compile(GeoConstants.POLYGON_REG_EXPRESSION);
+ Pattern pattern =
+ Pattern.compile(GeoConstants.POLYGON_REG_EXPRESSION,
Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(polygon);
while (matcher.find()) {
String matchedStr = matcher.group();
@@ -62,11 +64,15 @@ public class PolygonListExpression extends
PolygonExpression {
// 2. get the range list of each polygon
List<Long[]> processedRangeList = instance.query(polygons.get(0));
GeoHashUtils.validateRangeList(processedRangeList);
+ GeoOperationType operationType = GeoOperationType.getEnum(opType);
+ if (operationType == null) {
+ throw new RuntimeException("Unsupported operation type " + opType);
+ }
for (int i = 1; i < polygons.size(); i++) {
List<Long[]> tempRangeList = instance.query(polygons.get(i));
GeoHashUtils.validateRangeList(tempRangeList);
processedRangeList = GeoHashUtils.processRangeList(
- processedRangeList, tempRangeList, opType);
+ processedRangeList, tempRangeList, operationType);
}
ranges = processedRangeList;
} catch (Exception e) {
diff --git
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
index 81efe05..704128d 100644
---
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
+++
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
@@ -27,6 +27,7 @@ import
org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.util.CustomIndex;
import org.apache.carbondata.geo.GeoConstants;
import org.apache.carbondata.geo.GeoHashUtils;
+import org.apache.carbondata.geo.GeoOperationType;
/**
* InPolygonRangeList expression processor. It inputs the InPolygonRangeList
string to
@@ -49,7 +50,8 @@ public class PolygonRangeListExpression extends
PolygonExpression {
public void processExpression() {
// 1. parse the range list string
List<String> rangeLists = new ArrayList<>();
- Pattern pattern = Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION);
+ Pattern pattern =
+ Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION,
Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(polygon);
while (matcher.find()) {
String matchedStr = matcher.group();
@@ -57,11 +59,15 @@ public class PolygonRangeListExpression extends
PolygonExpression {
}
// 2. process the range lists
if (rangeLists.size() > 0) {
+ GeoOperationType operationType = GeoOperationType.getEnum(opType);
+ if (operationType == null) {
+ throw new RuntimeException("Unsupported operation type " + opType);
+ }
List<Long[]> processedRangeList =
getRangeListFromString(rangeLists.get(0));
for (int i = 1; i < rangeLists.size(); i++) {
List<Long[]> tempRangeList = getRangeListFromString(rangeLists.get(i));
processedRangeList = GeoHashUtils.processRangeList(
- processedRangeList, tempRangeList, opType);
+ processedRangeList, tempRangeList, operationType);
}
ranges = processedRangeList;
}
diff --git
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
index 70ea7f3..61fcb3d 100644
---
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
+++
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
@@ -61,7 +61,8 @@ public class PolylineListExpression extends PolygonExpression
{
// 1. parse the polyline list string and get polygon from each polyline
List<Geometry> polygonList = new ArrayList<>();
WKTReader wktReader = new WKTReader();
- Pattern pattern = Pattern.compile(GeoConstants.POLYLINE_REG_EXPRESSION);
+ Pattern pattern =
+ Pattern.compile(GeoConstants.POLYLINE_REG_EXPRESSION,
Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(polygon);
while (matcher.find()) {
String matchedStr = matcher.group();
@@ -80,7 +81,7 @@ public class PolylineListExpression extends PolygonExpression
{
List<Long[]> tempRangeList = instance.query(tempPointList);
GeoHashUtils.validateRangeList(tempRangeList);
processedRangeList = GeoHashUtils.processRangeList(
- processedRangeList, tempRangeList, GeoOperationType.OR.toString());
+ processedRangeList, tempRangeList, GeoOperationType.OR);
}
ranges = processedRangeList;
}
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
index a2e7a42..e25768a 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
@@ -33,34 +33,39 @@ object GeoUtilUDFs {
}
}
-class GeoIdToGridXyUDF extends (Long => Array[Int]) with Serializable {
- override def apply(geoId: Long): Array[Int] = {
+class GeoIdToGridXyUDF extends (java.lang.Long => Array[Int]) with
Serializable {
+ override def apply(geoId: java.lang.Long): Array[Int] = {
GeoHashUtils.geoID2ColRow(geoId)
}
}
-class GeoIdToLatLngUDF extends ((Long, Double, Int) => Array[Double]) with
Serializable {
- override def apply(geoId: Long, oriLatitude: Double, gridSize: Int):
Array[Double] = {
+class GeoIdToLatLngUDF
+ extends ((java.lang.Long, java.lang.Double, java.lang.Integer) =>
Array[Double]) with
+ Serializable {
+ override def apply(geoId: java.lang.Long, oriLatitude: java.lang.Double,
+ gridSize: java.lang.Integer): Array[Double] = {
GeoHashUtils.geoID2LatLng(geoId, oriLatitude, gridSize)
}
}
-class LatLngToGeoIdUDF extends ((Long, Long, Double, Int) => Long) with
Serializable {
- override def apply(latitude: Long, longitude: Long, oriLatitude: Double,
gridSize: Int): Long = {
+class LatLngToGeoIdUDF extends ((java.lang.Long, java.lang.Long,
+ java.lang.Double, java.lang.Integer) => Long) with Serializable {
+ override def apply(latitude: java.lang.Long, longitude: java.lang.Long,
+ oriLatitude: java.lang.Double, gridSize: java.lang.Integer): Long = {
GeoHashUtils.lonLat2GeoID(longitude, latitude, oriLatitude, gridSize)
}
}
-class ToUpperLayerGeoIdUDF extends (Long => Long) with Serializable {
- override def apply(geoId: Long): Long = {
+class ToUpperLayerGeoIdUDF extends (java.lang.Long => Long) with Serializable {
+ override def apply(geoId: java.lang.Long): Long = {
GeoHashUtils.convertToUpperLayerGeoId(geoId)
}
}
-class ToRangeListUDF extends ((String, Double, Int) =>
mutable.Buffer[Array[Long]])
- with Serializable {
- override def apply(polygon: String, oriLatitude: Double,
- gridSize: Int): mutable.Buffer[Array[Long]] = {
+class ToRangeListUDF extends ((java.lang.String, java.lang.Double,
java.lang.Integer) =>
+ mutable.Buffer[Array[Long]]) with Serializable {
+ override def apply(polygon: java.lang.String, oriLatitude: java.lang.Double,
+ gridSize: java.lang.Integer): mutable.Buffer[Array[Long]] = {
GeoHashUtils.getRangeList(polygon, oriLatitude,
gridSize).asScala.map(_.map(Long2long))
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
index ff618b5..8bed2fb 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
@@ -46,8 +46,8 @@ class InPolygonListUDF extends ((String, String) => Boolean)
with Serializable {
}
@InterfaceAudience.Internal
-class InPolylineListUDF extends ((String, Float) => Boolean) with Serializable
{
- override def apply(v1: String, v2: Float): Boolean = {
+class InPolylineListUDF extends ((String, java.lang.Float) => Boolean) with
Serializable {
+ override def apply(v1: String, v2: java.lang.Float): Boolean = {
true // Carbon applies the filter. So, Spark do not have to apply filter.
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 64ac71e..505c481 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -78,6 +78,18 @@ private[sql] case class CarbonDescribeFormattedCommand(
tblProps(s"${ CarbonCommonConstants.SPATIAL_INDEX
}.$index.datatype"), ""),
("Sources Columns",
tblProps(s"${ CarbonCommonConstants.SPATIAL_INDEX
}.$index.sourcecolumns"), ""))
+ if
(tblProps.contains(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.originlatitude"))
{
+ results ++= Seq(("Origin Latitude",
+
tblProps(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.originlatitude"), ""))
+ }
+ if
(tblProps.contains(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.gridsize")) {
+ results ++= Seq(("Grid Size",
+
tblProps(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.gridsize"), ""))
+ }
+ if
(tblProps.contains(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.conversionratio"))
{
+ results ++= Seq(("Conversion Ratio",
+
tblProps(s"${CarbonCommonConstants.SPATIAL_INDEX}.$index.conversionratio"), ""))
+ }
if (indexList.length != count) {
results ++= Seq(("", "", ""))
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index be8b0e3..fdc3eaa 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.scan.expression.{Expression => CarbonFilter}
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.geo.{InPolygonListUDF, InPolygonRangeListUDF,
InPolygonUDF, InPolylineListUDF}
+import org.apache.carbondata.geo.{GeoHashUtils, GeoIdToGridXyUDF,
GeoIdToLatLngUDF, InPolygonListUDF, InPolygonRangeListUDF, InPolygonUDF,
InPolylineListUDF, LatLngToGeoIdUDF, ToRangeListUDF, ToUpperLayerGeoIdUDF}
import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.index.{TextMatchMaxDocUDF, TextMatchUDF}
@@ -62,10 +62,49 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
} catch {
case _: CarbonPhysicalPlanException => Nil
}
+ case Project(projectList, _: OneRowRelation) if validateUdf(projectList)
=> Nil
case _ => Nil
}
}
+ private def validateUdf(projects: Seq[NamedExpression]): Boolean = {
+ projects foreach {
+ case alias: Alias if alias.child.isInstanceOf[Expression] =>
+ alias.child match {
+ case Cast(s: ScalaUDF, _, _) => validateGeoUtilUDFs(s)
+ case s: ScalaUDF => validateGeoUtilUDFs(s)
+ case _ =>
+ }
+ }
+ true
+ }
+
+ def validateGeoUtilUDFs(s: ScalaUDF): Boolean = {
+ s.function match {
+ case _: ToRangeListUDF =>
+ val inputNames = List("polygon", "oriLatitude", "gridSize")
+ for (i <- s.children.indices) {
+ GeoHashUtils.validateUDFInputValue(s.children(i), inputNames(i),
s.inputTypes(i).typeName)
+ }
+ case _: LatLngToGeoIdUDF =>
+ val inputNames = List("latitude", "longitude", "oriLatitude",
"gridSize")
+ for (i <- s.children.indices) {
+ GeoHashUtils.validateUDFInputValue(s.children(i), inputNames(i),
s.inputTypes(i).typeName)
+ }
+ case _: GeoIdToGridXyUDF =>
+ GeoHashUtils.validateUDFInputValue(s.children.head, "geoId",
s.inputTypes.head.typeName)
+ case _: GeoIdToLatLngUDF =>
+ val inputNames = List("geoId", "oriLatitude", "gridSize")
+ for (i <- s.children.indices) {
+ GeoHashUtils.validateUDFInputValue(s.children(i), inputNames(i),
s.inputTypes(i).typeName)
+ }
+ case _: ToUpperLayerGeoIdUDF =>
+ GeoHashUtils.validateUDFInputValue(s.children.head, "geoId",
s.inputTypes.head.typeName)
+ case _ =>
+ }
+ true
+ }
+
private def isCarbonRelation(logicalRelation: LogicalRelation): Boolean = {
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 8bd680e..3e30a97 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -243,6 +243,13 @@ object CarbonFilters {
throw new MalformedCarbonCommandException("Expect two string in
polyline list")
}
val (columnName, instance) = getGeoHashHandler(relation.carbonTable)
+ if (scala.util.Try(children.last.toString().toFloat).isFailure) {
+ throw new MalformedCarbonCommandException("Expect buffer size to be
of float type")
+ }
+ val bufferSize = children.last.toString().toFloat
+ if (bufferSize <= 0) {
+ throw new MalformedCarbonCommandException("Expect buffer size to be
a positive value")
+ }
Some(new PolylineListExpression(children.head.toString(),
children.last.toString().toFloat, columnName, instance))
case _: InPolygonRangeListUDF =>
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 82d24f9..50ad29e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import
org.apache.spark.sql.parser.CarbonSparkSqlParserUtil.needToConvertToLowerCase
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
@@ -128,8 +129,14 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
val tableProperties = mutable.Map[String, String]()
val properties: Map[String, String] =
getPropertyKeyValues(tablePropertyList)
- properties.foreach{property => tableProperties.put(property._1,
property._2)}
+ properties.foreach { property =>
+ if (needToConvertToLowerCase(property._1)) {
+ tableProperties.put(property._1.toLowerCase, property._2.toLowerCase)
+ } else {
+ tableProperties.put(property._1.toLowerCase, property._2)
+ }
+ }
// validate partition clause
val partitionByStructFields =
Option(partitionColumns).toSeq.flatMap(visitColTypeList)
val partitionFields = CarbonSparkSqlParserUtil.
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 35ef69c..54749c9 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -564,7 +564,7 @@ object CarbonSparkSqlParserUtil {
def needToConvertToLowerCase(key: String): Boolean = {
var noConvertList = Array(CarbonCommonConstants.COMPRESSOR, "PATH",
"bad_record_path",
"timestampformat", "dateformat")
- if (key.startsWith(CarbonCommonConstants.SPATIAL_INDEX) &&
key.endsWith(".class")) {
+ if (key.toLowerCase.startsWith(CarbonCommonConstants.SPATIAL_INDEX) &&
key.endsWith(".class")) {
noConvertList = noConvertList ++ Array(key)
}
!noConvertList.exists(x => x.equalsIgnoreCase(key))
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 2ea56db..628d690 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.geo
import scala.collection.mutable
+import org.apache.spark.SparkException
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -149,6 +150,56 @@ class GeoTest extends QueryTest with BeforeAndAfterAll
with BeforeAndAfterEach {
s"the option(s): column_meta_cache"))
}
+ test("test UDF's with invalid values") {
+ createTable()
+ val exception1 = intercept[RuntimeException](sql(
+ s"select mygeohash, longitude, latitude from $table1 where
IN_POLYGON_RANGE_LIST(" +
+ s"'RANGELIST (855279368848 855279368850, 855279368849 855279368852)',
45)").collect())
+ assert(exception1.getMessage.contains("Unsupported operation type 45"))
+
+ var exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
+ s"'linestring (120.184179 30.327465, 120.191603 30.328946,
120.199242 30.324464, " +
+ s"120.190359 30.315388)', 'x')").collect())
+ assert(exception2.getMessage.contains("Expect buffer size to be of float
type"))
+
+ exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
+ s"'linestring (120.184179 30.327465, 120.191603 30.328946,
120.199242 30.324464, " +
+ s"120.190359 30.315388)', -1)").collect())
+ assert(exception2.getMessage.contains("Expect buffer size to be a positive
value"))
+
+ exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select LatLngToGeoId(39930753, 116302895, 39.832277, -50) as
geoId").collect())
+ assert(exception2.getMessage.contains("Expect grid size to be a positive
integer"))
+
+ exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select GeoIdToLatLng(855279270226, 39.832277, -50) as
LatitudeAndLongitude").collect())
+ assert(exception2.getMessage.contains("Expect grid size to be a positive
integer"))
+
+ exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select ToRangeList('116.321011 40.123503, 116.320311
40.122503,116.321111 40.121503, " +
+ s"116.321011 40.123503', 39.832277, 0) as rangeList")
+ .collect())
+ assert(exception2.getMessage.contains("Expect grid size to be a positive
integer"))
+
+ exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select GeoIdToGridXy('X') as GridXY").collect())
+ assert(exception2.getMessage.contains("Expect geoId to be of long type"))
+
+ exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select LatLngToGeoId('X', 'X', 'X', 'X') as geoId").collect())
+ assert(exception2.getMessage.contains("Expect latitude to be of long
type"))
+
+ exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select GeoIdToLatLng('X', 'X', 'X') as
LatitudeAndLongitude").collect())
+ assert(exception2.getMessage.contains("Expect geoId to be of long type"))
+
+ exception2 = intercept[MalformedCarbonCommandException](
+ sql(s"select ToUpperLayerGeoId('X') as upperLayerGeoId").collect())
+ assert(exception2.getMessage.contains("Expect geoId to be of long type"))
+ }
+
test("test materialized view with spatial column") {
createTable()
val exception = intercept[MalformedCarbonCommandException](sql(
@@ -502,7 +553,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with
BeforeAndAfterEach {
Row(120196020, 30321651)))
checkAnswer(
sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
- s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411
30.314540," +
+ s"'polygon ((120.176433 30.327431,120.171283 30.322245,120.181411
30.314540," +
s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
s"POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819
30.321464," +
s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', "
+
@@ -605,8 +656,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with
BeforeAndAfterEach {
Row(120198638, 30323540)))
checkAnswer(
sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
- s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242
30.324464), " +
- s"LINESTRING (120.199242 30.324464, 120.190359 30.315388)', 65)"),
+ s"'linestring (120.184179 30.327465, 120.191603 30.328946, 120.199242
30.324464), " +
+ s"linestring (120.199242 30.324464, 120.190359 30.315388)', 65)"),
Seq(Row(120184976, 30327105),
Row(120197093, 30325985),
Row(120196020, 30321651),
@@ -660,7 +711,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with
BeforeAndAfterEach {
Row(855282157702L, 116325378, 39963129)))
checkAnswer(
sql(s"select mygeohash, longitude, latitude from $table1 where
IN_POLYGON_RANGE_LIST(" +
- s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+ s"'rangelist (855279368850 855279368852, 855280799610 855280799612, " +
s"855282156300 855282157400), " +
s"RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
s"855282156301 855282157800)', " +
@@ -838,7 +889,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with
BeforeAndAfterEach {
| 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
| 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
| 'SPATIAL_INDEX.mygeohash.gridSize'='50',
- | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+ | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000',
+ |
'SPATIAL_INDEX.mygeohash.class'='org.apache.carbondata.geo.GeoHashIndex')
""".stripMargin)
}