This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8de65a2
[CARBONDATA-4119][CARBONDATA-4238][CARBONDATA-4237][CARBONDATA-4236] Support
geo insert without geoId and document changes
8de65a2 is described below
commit 8de65a2767a1d20ff3cad947303c686a0017df43
Author: ShreelekhyaG <[email protected]>
AuthorDate: Mon Aug 16 23:10:56 2021 +0530
[CARBONDATA-4119][CARBONDATA-4238][CARBONDATA-4237][CARBONDATA-4236]
Support geo insert without geoId and document changes
Why is this PR needed?
1. To insert without geoid (like load) on geo table.
2. [CARBONDATA-4119] : User Input for GeoID column not validated.
3. [CARBONDATA-4238] : Documentation Issue in
ddl-of-carbondata.md#add-columns
4. [CARBONDATA-4237] : Documentation issues in streaming-guide.md,
file-structure-of-carbondata.md and sdk-guide.md.
5. [CARBONDATA-4236] : Documenatation issues in configuration-parameters.md.
6. import processing class in streaming-guide.md is wrong
What changes were proposed in this PR?
1. Made changes to support insert on geo table with auto-generated geoId.
2. [CARBONDATA-4119] : Added documentation about insert with custom geoId.
Changes in docs/spatial-index-guide.md
3. Other documentation changes added.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4205
---
.../core/constants/CarbonCommonConstants.java | 4 +--
docs/configuration-parameters.md | 4 +--
docs/ddl-of-carbondata.md | 2 +-
docs/file-structure-of-carbondata.md | 15 +++++-----
docs/images/2-1_1_latest.PNG | Bin 0 -> 31294 bytes
docs/sdk-guide.md | 4 +--
docs/spatial-index-guide.md | 9 ++++++
docs/streaming-guide.md | 5 +++-
.../spark/sql/hive/CarbonAnalysisRules.scala | 31 +++++++++++++++------
.../scala/org/apache/carbondata/geo/GeoTest.scala | 11 ++++++++
10 files changed, 60 insertions(+), 25 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7b2fddf..d72d6c1 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2527,9 +2527,9 @@ public final class CarbonCommonConstants {
/**
* Default value for SI segment Compaction / merge small files
- * Making this true degrade the LOAD performance
+ * Making this true degrades the LOAD performance
* When the number of small files increase for SI segments(it can happen as
number of columns will
- * be less and we store position id and reference columns), user an either
set to true which will
+ * be less and we store position id and reference columns), user can either
set to true which will
* merge the data files for upcoming loads or run SI rebuild command which
does this job for all
* segments. (REBUILD INDEX <index_table>)
*/
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 7a1b610..73bf2ce 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -116,8 +116,8 @@ This section provides the details of all the configurations
required for the Car
| carbon.enable.page.level.reader.in.compaction|false|Enabling page level
reader for compaction reduces the memory usage while compacting more number of
segments. It allows reading only page by page instead of reading whole blocklet
to memory. **NOTE:** Please refer to
[file-structure-of-carbondata](./file-structure-of-carbondata.md#carbondata-file-format)
to understand the storage format of CarbonData and concepts of pages.|
| carbon.concurrent.compaction | true | Compaction of different tables can be
executed concurrently. This configuration determines whether to compact all
qualifying tables in parallel or not. **NOTE:** Compacting concurrently is a
resource demanding operation and needs more resources there by affecting the
query performance also. This configuration is **deprecated** and might be
removed in future releases. |
| carbon.compaction.prefetch.enable | false | Compaction operation is similar
to Query + data load where in data from qualifying segments are queried and
data loading performed to generate a new single segment. This configuration
determines whether to query ahead data from segments and feed it for data
loading. **NOTE:** This configuration is disabled by default as it needs extra
resources for querying extra data. Based on the memory availability on the
cluster, user can enable it to imp [...]
-| carbon.enable.range.compaction | true | To configure Ranges-based Compaction
to be used or not for RANGE_COLUMN. If true after compaction also the data
would be present in ranges. |
-| carbon.si.segment.merge | false | Making this true degrade the LOAD
performance. When the number of small files increase for SI segments(it can
happen as number of columns will be less and we store position id and reference
columns), user an either set to true which will merge the data files for
upcoming loads or run SI refresh command which does this job for all segments.
(REFRESH INDEX <index_table>) |
+| carbon.enable.range.compaction | true | To configure Range-based Compaction
to be used or not for RANGE_COLUMN. If true after compaction also the data
would be present in ranges. |
+| carbon.si.segment.merge | false | Making this true degrades the LOAD
performance. When the number of small files increase for SI segments(it can
happen as number of columns will be less and we store position id and reference
columns), user can either set to true which will merge the data files for
upcoming loads or run SI refresh command which does this job for all segments.
(REFRESH INDEX <index_table>) |
| carbon.partition.data.on.tasklevel | false | When enabled, tasks launched
for Local sort partition load will be based on one node one task. Compaction
will be performed based on task level for a partition. Load performance might
be degraded, because, the number of tasks launched is equal to number of nodes
in case of local sort. For compaction, memory consumption will be less, as more
number of tasks will be launched for a partition |
## Query Configuration
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index b05a54a..dd09b91 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -778,7 +778,7 @@ CarbonData DDL statements are documented here,which
includes:
**NOTE:** Adding of only single-level Complex datatype columns(only
array and struct) is supported.
Example -
```
- ALTER TABLE <table-name> ADD COLUMNS(arrField array<array<int>>,
structField struct<id1:string,name1:string>)
+ ALTER TABLE <table-name> ADD COLUMNS(arrField array<int>, structField
struct<id1:string,name1:string>)
```
Users can specify which columns to include and exclude for local dictionary
generation after adding new columns. These will be appended with the already
existing local dictionary include and exclude columns of main table
respectively.
diff --git a/docs/file-structure-of-carbondata.md
b/docs/file-structure-of-carbondata.md
index f82ccf3..449753e 100644
--- a/docs/file-structure-of-carbondata.md
+++ b/docs/file-structure-of-carbondata.md
@@ -46,14 +46,13 @@ The CarbonData files are stored in the location specified
by the ***spark.sql.wa
The file directory structure is as below:
-
-
-1. ModifiedTime.mdt records the timestamp of the metadata with the
modification time attribute of the file. When the drop table and create table
are used, the modification time of the file is updated. This is common to all
databases and hence is kept in parallel to databases
-2. The **default** is the database name and contains the user tables.default
is used when user doesn't specify any database name;else user configured
database name will be the directory name. user_table is the table name.
-3. Metadata directory stores schema files, tablestatus and dictionary files
(including .dict, .dictmeta and .sortindex). There are three types of metadata
data information files.
-4. data and index files are stored under directory named **Fact**. The Fact
directory has a Part0 partition directory, where 0 is the partition number.
-5. There is a Segment_0 directory under the Part0 directory, where 0 is the
segment number.
-6. There are two types of files, carbondata and carbonindex, in the Segment_0
directory.
+
+
+1. The **default** is the database name and contains the user tables.default
is used when user doesn't specify any database name;else user configured
database name will be the directory name. user_table is the table name.
+2. Metadata directory stores schema files, tablestatus and segment details
(includes .segment file for each segment). There are three types of metadata
data information files.
+3. data and index files are stored under directory named **Fact**. The Fact
directory has a Part0 partition directory, where 0 is the partition number.
+4. There is a Segment_0 directory under the Part0 directory, where 0 is the
segment number.
+5. There are two types of files, carbondata and carbonmergeindex, in the
Segment_0 directory.
diff --git a/docs/images/2-1_1_latest.PNG b/docs/images/2-1_1_latest.PNG
new file mode 100644
index 0000000..c15581d
Binary files /dev/null and b/docs/images/2-1_1_latest.PNG differ
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 4da7893..fdd689c 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -742,9 +742,7 @@ int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
System.out.println(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t",
- i, row[0], row[1], row[2], row[3], row[4], row[5],
- new Date((day * ((int) row[6]))), new Timestamp((long) row[7] / 1000),
row[8]
- ));
+ i, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7],
row[8]));
i++;
}
diff --git a/docs/spatial-index-guide.md b/docs/spatial-index-guide.md
index db7392c..8484d41 100644
--- a/docs/spatial-index-guide.md
+++ b/docs/spatial-index-guide.md
@@ -91,6 +91,15 @@ Note:
| SPATIAL_INDEX.xxx.conversionRatio | Conversion factor. It allows user to
translate longitude and latitude to long. For example, if the data to load is
longitude = 13.123456, latitude = 101.12356. User can configure conversion
ratio sub-property value as 1000000, and change data to load as longitude =
13123456 and latitude = 10112356. Operations on long is much faster compared to
floating-point numbers.|
| SPATIAL_INDEX.xxx.class | Optional user custom implementation class. Value
is fully qualified class name.|
+### Load/Insert
+Load/Insert with no geoId column, then geoId will be generated internally.
+```
+insert into source_index select 1,116.285807,40.084087;
+```
+Load/Insert with custom geoId
+```
+insert into source_index select 0, 1,116.285807,40.084087;
+```
### Select Query
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index aec9b3c..3d5da85 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -62,7 +62,8 @@ Start spark-shell in new terminal, type :paste, then copy and
run the following
import java.io.File
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
- import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+ import org.apache.spark.sql.streaming.Trigger.ProcessingTime
+ import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser
@@ -148,6 +149,8 @@ TBLPROPERTIES('streaming'='true')
DESC FORMATTED streaming_table
```
+NOTE: Streaming table doesn't support alter table schema operations such as
alter add column, drop column, rename column, change datatype and rename table
name.
+
## Alter streaming property
For an old table, use ALTER TABLE command to set the streaming property.
```sql
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 3689ec1..74bdca8 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -22,7 +22,7 @@ import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias,
UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast,
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast,
Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.command.LoadDataCommand
import
org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.strategy.{CarbonPlanHelper, DMLHelper}
+import org.apache.spark.sql.types.NullType
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
@@ -267,17 +268,31 @@ case class CarbonPreInsertionCasts(sparkSession:
SparkSession) extends Rule[Logi
child: LogicalPlan,
containsMultipleInserts: Boolean): LogicalPlan = {
val carbonDSRelation =
relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
- if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants
+ val carbonTable = carbonDSRelation.carbonRelation.carbonTable
+ val tableProperties =
carbonTable.getTableInfo.getFactTable.getTableProperties
+ val spatialProperty =
tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
+ val expectedOutput = carbonDSRelation.carbonRelation.output
+ if (expectedOutput.size > CarbonCommonConstants
.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
CarbonException.analysisException(
s"Maximum number of columns supported: " +
s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}")
}
+ var newLogicalPlan = child
+ if (spatialProperty != null && !spatialProperty.isEmpty &&
+ child.output.size + 1 == expectedOutput.size) {
+ newLogicalPlan = child.transform {
+ // To support insert sql to automatically generate GeoId if customized
input is not given.
+ case p: Project =>
+ val geoId = Alias(Literal(null, NullType).asInstanceOf[Expression],
"NULL")()
+ val list = Seq(geoId) ++ p.projectList
+ Project(list, p.child)
+ }
+ }
// In spark, PreprocessTableInsertion rule has below cast logic.
// It was missed in carbon when implemented insert into rules.
- val actualOutput = child.output
- val expectedOutput = carbonDSRelation.carbonRelation.output
- var newChildOutput = child.output.zip(expectedOutput)
+ val actualOutput = newLogicalPlan.output
+ var newChildOutput = newLogicalPlan.output.zip(expectedOutput)
.map {
case (actual, expected) =>
if (expected.dataType.sameType(actual.dataType) &&
@@ -292,7 +307,7 @@ case class CarbonPreInsertionCasts(sparkSession:
SparkSession) extends Rule[Logi
explicitMetadata = Option(expected.metadata))
}
} ++ actualOutput.takeRight(actualOutput.size - expectedOutput.size)
- if (newChildOutput.size >= carbonDSRelation.carbonRelation.output.size ||
+ if (newChildOutput.size >= expectedOutput.size ||
carbonDSRelation.carbonTable.isHivePartitionTable) {
newChildOutput = newChildOutput.zipWithIndex.map { columnWithIndex =>
columnWithIndex._1 match {
@@ -301,10 +316,10 @@ case class CarbonPreInsertionCasts(sparkSession:
SparkSession) extends Rule[Logi
case attr => attr
}
}
- val newChild: LogicalPlan = if (newChildOutput == child.output) {
+ val newChild: LogicalPlan = if (newChildOutput == newLogicalPlan.output)
{
throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION
is not supported")
} else {
- Project(newChildOutput, child)
+ Project(newChildOutput, newLogicalPlan)
}
val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index c7b2e5a..65e5622 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -868,6 +868,17 @@ class GeoTest extends QueryTest with BeforeAndAfterAll
with BeforeAndAfterEach {
)
}
+ test("test insert with autogenerated geoid") {
+ createTable()
+ // insert without geoid
+ sql(s"insert into $table1 select 1575428400000,116285807,40084087")
+ // insert with customized geoid
+ sql(s"insert into $table1 select 0,1575428400000,116285807,40084087")
+ checkAnswer(sql(s"select *from $table1"),
+ Seq(Row(855280799612L, 1575428400000L, 116285807, 40084087),
+ Row(0, 1575428400000L, 116285807, 40084087)))
+ }
+
override def afterEach(): Unit = {
drop()
}