VenuReddy2103 commented on a change in pull request #3436:
[CARBONDATA-3548]Geospatial Support: Modified to create and load the table with
a nonschema dimension sort column. And added InPolygon UDF
URL: https://github.com/apache/carbondata/pull/3436#discussion_r373038067
##########
File path:
integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
##########
@@ -264,8 +308,142 @@ abstract class CarbonDDLSqlParser extends
AbstractCarbonSparkSQLParser {
s"Carbon Implicit column ${col.column} is not allowed in" +
s" column name while creating table")
}
+ }
+ }
+ /**
+ * The method parses, validates and processes the index_handler property.
+ * @param tableProperties Table properties
+ * @param tableFields Sequence of table fields
+ * @return <Seq[Field]> Sequence of index fields to add to table fields
+ */
+ private def processIndexProperty(tableProperties: mutable.Map[String,
String],
+ tableFields: Seq[Field]): Seq[Field] = {
+ val option = tableProperties.get(CarbonCommonConstants.INDEX_HANDLER)
+ val fields = ListBuffer[Field]()
+ if (option.isDefined) {
+ if (option.get.trim.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is invalid.
" +
+ s"Option value is empty.")
+ }
+ option.get.split(",").map(_.trim).foreach { handler =>
+ /* Validate target column name */
+ if (tableFields.exists(_.column.equalsIgnoreCase(handler))) {
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is
invalid. " +
+ s"handler: $handler must not match with any other column name in
the table")
+ }
+ val TYPE = s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.type"
+ val SOURCE_COLUMNS =
s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.sourcecolumns"
+ val SOURCE_COLUMN_TYPES
+ =
s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.sourcecolumntypes"
+ val HANDLER_CLASS =
s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.class"
+ val HANDLER_INSTANCE =
s"${CarbonCommonConstants.INDEX_HANDLER}.$handler.instance"
+
+ val handlerType = tableProperties.get(TYPE)
+ if (handlerType.isEmpty || handlerType.get.trim.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is
invalid. " +
+ s"$TYPE properties must be specified.")
+ }
+ val sourceColumnsOption = tableProperties.get(SOURCE_COLUMNS)
+ if (sourceColumnsOption.isEmpty ||
sourceColumnsOption.get.trim.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is
invalid. " +
+ s"$SOURCE_COLUMNS property must be specified.")
+ }
+ val sourcesWithoutSpaces = sourceColumnsOption.get.replaceAll("\\s",
"")
+ /* Validate source columns */
+ val sources = sourcesWithoutSpaces.split(",")
+ if (sources.distinct.length != sources.size) {
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is
invalid. " +
+ s"$SOURCE_COLUMNS property cannot have duplicate columns.")
+ }
+ val sourceTypes = StringBuilder.newBuilder
+ sources.foreach { column =>
+ tableFields.find(_.column.equalsIgnoreCase(column)) match {
+ case Some(field) =>
sourceTypes.append(field.dataType.get).append(",")
+ case None =>
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is
invalid. " +
+ s"Source column: $column in property " +
+ s"$SOURCE_COLUMNS must be a column in the table.")
+ }
+ }
+ tableProperties.put(SOURCE_COLUMNS, sourcesWithoutSpaces)
+ tableProperties.put(SOURCE_COLUMN_TYPES,
sourceTypes.dropRight(1).toString())
+ val handlerClass = tableProperties.get(HANDLER_CLASS)
+ val handlerClassName: String = handlerClass match {
+ case Some(className) => className.trim
+ case None =>
+ /* use handler type to find the default implementation */
+ if
(handlerType.get.trim.equalsIgnoreCase(CarbonCommonConstants.GEOHASH)) {
+ /* Use GeoHash default implementation */
+ val className = classOf[GeoHashImpl].getName
+ tableProperties.put(HANDLER_CLASS, className)
+ className
+ } else {
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is
invalid. " +
+ s"Unsupported value: ${handlerType.get} specified for
property $TYPE.")
+ }
+ }
+
+ try {
+ val handlerClass: Class[_] =
java.lang.Class.forName(handlerClassName)
+ val instance = handlerClass.newInstance().asInstanceOf[
+ CustomIndex[Long, String, java.util.List[Array[Long]]]]
+ instance.init(handler, tableProperties.asJava)
+
+ // TODO Need to check to how to store the instance. This
serialization may be incorrect.
+ val bos = new ByteArrayOutputStream()
+ val oos = new ObjectOutputStream(bos)
+ oos.writeObject(instance)
+ oos.flush()
+ tableProperties.put(HANDLER_INSTANCE,
Base64.getEncoder().encodeToString(bos.toByteArray))
+ } catch {
+ case ex: ClassNotFoundException =>
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property process
failed. " +
+ s"$handlerClassName class in property $HANDLER_CLASS failed
with $ex")
+ case ex@(_: InstantiationError | _: IllegalAccessException) =>
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property process
failed. " +
+ s"Instantiation of class $handlerClassName failed with $ex")
+ case ex: ClassCastException =>
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property process
failed. " +
+ s"Failed due to $ex")
+ }
+
+ /* Add target column in sort column */
+ var sortKey =
tableProperties.getOrElse(CarbonCommonConstants.SORT_COLUMNS, "")
+ if (!sortKey.isEmpty) {
+ /* Source columns are not allowed to be specified in sort columns.
Instead target column
+ is implicitly treated as sort column */
+ sources.foreach { column =>
+ sortKey.split(",").map(_.trim).foreach { key =>
+ if (key.equalsIgnoreCase(column)) {
+ throw new MalformedCarbonCommandException(
+ s"Carbon ${CarbonCommonConstants.INDEX_HANDLER} property is
invalid. " +
+ s"Source column: $key is not allowed in
${CarbonCommonConstants.SORT_COLUMNS}" +
+ s" property. Instead, handler: $handler is implicitly
treated as sort column.")
+ }
+ }
+ }
+ sortKey += "," + handler
+ } else {
+ sortKey = handler
+ }
+ tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKey)
+ // TODO Need to convert it to DataType object and pass it to
StructField dataType
+ val dataType = tableProperties.get(TYPE)
+ fields += getField(StructField(handler, LongType))
Review comment:
Have refactored it. Currently support only index type of long type.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services