Github user ajantha-bhat commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2273#discussion_r187136017
--- Diff:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
---
@@ -151,6 +154,33 @@ public CarbonTable
getOrCreateCarbonTable(Configuration configuration) throws IO
SegmentStatusManager segmentStatusManager = new
SegmentStatusManager(identifier);
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
segmentStatusManager
.getValidAndInvalidSegments(loadMetadataDetails,
this.readCommittedScope);
+
+ // For NonTransactional table, compare the schema of all index files
with inferred schema.
+ // If there is a mismatch throw exception. As all files must be of
same schema.
+ if (!carbonTable.getTableInfo().isTransactionalTable()) {
+ SchemaConverter schemaConverter = new
ThriftWrapperSchemaConverterImpl();
+ for (Segment segment : segments.getValidSegments()) {
+ Map<String, String> indexFiles = segment.getCommittedIndexFile();
+ for (Map.Entry<String, String> indexFileEntry :
indexFiles.entrySet()) {
+ Path indexFile = new Path(indexFileEntry.getKey());
+ org.apache.carbondata.format.TableInfo tableInfo =
CarbonUtil.inferSchemaFromIndexFile(
+ indexFile.toString(), carbonTable.getTableName());
+ TableInfo wrapperTableInfo =
schemaConverter.fromExternalToWrapperTableInfo(
+ tableInfo, identifier.getDatabaseName(),
+ identifier.getTableName(),
+ identifier.getTablePath());
+ List<ColumnSchema> indexFileColumnList =
+ wrapperTableInfo.getFactTable().getListOfColumns();
+ List<ColumnSchema> tableColumnList =
+ carbonTable.getTableInfo().getFactTable().getListOfColumns();
+ if (!compareColumnSchemaList(indexFileColumnList,
tableColumnList)) {
+ throw new IOException("All the files schema doesn't match. "
--- End diff --
@kumarvishal09 :Tested with parquet by having 2 files with same column name
but different data type. parquet throws java.lang.UnsupportedOperationException
during read.
Caused by: java.lang.UnsupportedOperationException: Unimplemented type:
StringType
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:369)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:188)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
---