This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 0ce77c5 [CARBONDATA-3947]Fixed Hive read/write operation for Insert
into Select operation.
0ce77c5 is described below
commit 0ce77c56ce07b657ddd4f58f5f36692e3c31490d
Author: Nihal ojha <[email protected]>
AuthorDate: Wed Aug 5 15:37:05 2020 +0530
[CARBONDATA-3947]Fixed Hive read/write operation for Insert into Select
operation.
Why is this PR needed?
FileNotFound Exception is thrown for Hive read/write operation.
What changes were proposed in this PR?
When carbon schema file is not present at given location then build the
table from the configuration at place of throwing exception.
This closes #3878
---
.../carbondata/hive/MapredCarbonInputFormat.java | 26 +++++++++++++++-------
.../carbondata/hive/util/HiveCarbonUtil.java | 26 ++++++++++++++++++++++
.../org/apache/carbondata/hive/HiveCarbonTest.java | 15 +++++++++++++
3 files changed, 59 insertions(+), 8 deletions(-)
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index d99a56b..66f7c80 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.hive;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -35,6 +36,7 @@ import
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -44,6 +46,8 @@ import org.apache.carbondata.hadoop.api.CarbonInputFormat;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
@@ -73,7 +77,7 @@ public class MapredCarbonInputFormat extends
CarbonTableInputFormat<ArrayWritabl
* This method will read the schema from the physical file and populate into
CARBON_TABLE
*/
private static void populateCarbonTable(Configuration configuration, String
paths)
- throws IOException, InvalidConfigurationException {
+ throws IOException, InvalidConfigurationException, SQLException {
if (null != paths) {
// read the schema file to get the absoluteTableIdentifier having the
correct table id
// persisted in the schema
@@ -88,10 +92,16 @@ public class MapredCarbonInputFormat extends
CarbonTableInputFormat<ArrayWritabl
// persisted in the schema
carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
} else {
- // InferSchema from data file
- carbonTable = CarbonTable.buildFromTableInfo(SchemaReader
- .inferSchema(absoluteTableIdentifier, false));
- carbonTable.setTransactionalTable(false);
+ String carbonDataFile = CarbonUtil
+
.getFilePathExternalFilePath(absoluteTableIdentifier.getTablePath(),
configuration);
+ if (carbonDataFile == null) {
+ carbonTable = HiveCarbonUtil.getCarbonTable(configuration);
+ } else {
+ // InferSchema from data file
+ carbonTable = CarbonTable.buildFromTableInfo(SchemaReader
+ .inferSchema(absoluteTableIdentifier, false));
+ carbonTable.setTransactionalTable(false);
+ }
}
configuration.set(CARBON_TABLE,
ObjectSerializationUtil.convertObjectToString(carbonTable));
setTableInfo(configuration, carbonTable.getTableInfo());
@@ -101,7 +111,7 @@ public class MapredCarbonInputFormat extends
CarbonTableInputFormat<ArrayWritabl
}
private static CarbonTable getCarbonTable(Configuration configuration,
String path)
- throws IOException, InvalidConfigurationException {
+ throws IOException, InvalidConfigurationException, SQLException {
populateCarbonTable(configuration, path);
// read it from schema file in the store
String carbonTableStr = configuration.get(CARBON_TABLE);
@@ -193,7 +203,7 @@ public class MapredCarbonInputFormat extends
CarbonTableInputFormat<ArrayWritabl
jobConf.set(DATABASE_NAME, "_dummyDb_" + UUID.randomUUID().toString());
jobConf.set(TABLE_NAME, "_dummyTable_" + UUID.randomUUID().toString());
queryModel = getQueryModel(jobConf, path);
- } catch (InvalidConfigurationException e) {
+ } catch (InvalidConfigurationException | SQLException e) {
LOGGER.error("Failed to create record reader: " + e.getMessage(), e);
return null;
}
@@ -202,7 +212,7 @@ public class MapredCarbonInputFormat extends
CarbonTableInputFormat<ArrayWritabl
}
private QueryModel getQueryModel(Configuration configuration, String path)
- throws IOException, InvalidConfigurationException {
+ throws IOException, InvalidConfigurationException, SQLException {
CarbonTable carbonTable = getCarbonTable(configuration, path);
String projectionString = getProjection(configuration, carbonTable);
String[] projectionColumns = projectionString.split(",");
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
index 29b2e4d..ae2bceb 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
@@ -155,6 +155,31 @@ public class HiveCarbonUtil {
return loadModel;
}
+ public static CarbonTable getCarbonTable(Configuration tableProperties)
throws SQLException {
+ String[] tableUniqueName = tableProperties.get("name").split("\\.");
+ String databaseName = tableUniqueName[0];
+ String tableName = tableUniqueName[1];
+ String tablePath =
tableProperties.get(hive_metastoreConstants.META_TABLE_LOCATION);
+ String columns =
tableProperties.get(hive_metastoreConstants.META_TABLE_COLUMNS);
+ String sortColumns = tableProperties.get("sort_columns");
+ String columnTypes =
tableProperties.get(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+ String partitionColumns =
+
tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+ String partitionColumnTypes =
+
tableProperties.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
+ if (partitionColumns != null) {
+ columns = columns + "," + partitionColumns;
+ columnTypes = columnTypes + ":" + partitionColumnTypes;
+ }
+ String[] columnTypeArray =
HiveCarbonUtil.splitSchemaStringToArray(columnTypes);
+
+ CarbonTable carbonTable = CarbonTable.buildFromTableInfo(
+ HiveCarbonUtil.getTableInfo(tableName, databaseName, tablePath,
+ sortColumns, columns.split(","), columnTypeArray, new
ArrayList<>()));
+ carbonTable.setTransactionalTable(false);
+ return carbonTable;
+ }
+
private static TableInfo getTableInfo(String tableName, String databaseName,
String location,
String sortColumnsString, String[] columns, String[] columnTypes,
List<String> partitionColumns) throws SQLException {
@@ -197,6 +222,7 @@ public class HiveCarbonUtil {
tableInfo.setDatabaseName(databaseName);
tableInfo.setTablePath(location);
tableInfo.setFactTable(tableSchema);
+ tableInfo.setTableUniqueName(databaseName + "_" + tableName);
return tableInfo;
}
diff --git
a/integration/hive/src/test/java/org/apache/carbondata/hive/HiveCarbonTest.java
b/integration/hive/src/test/java/org/apache/carbondata/hive/HiveCarbonTest.java
index d58acac..35a364a 100644
---
a/integration/hive/src/test/java/org/apache/carbondata/hive/HiveCarbonTest.java
+++
b/integration/hive/src/test/java/org/apache/carbondata/hive/HiveCarbonTest.java
@@ -71,6 +71,21 @@ public class HiveCarbonTest extends HiveTestUtils {
}
@Test
+ public void verifyInsertIntoSelectOperation() throws Exception {
+ statement.execute("drop table if exists hive_carbon_table1");
+ statement.execute("CREATE TABLE hive_carbon_table1(id INT, name STRING,
scale DECIMAL, country STRING, salary DOUBLE) stored by
'org.apache.carbondata.hive.CarbonStorageHandler'");
+ statement.execute("INSERT into hive_carbon_table1 SELECT 1, 'RAM', '2.3',
'INDIA', 3500");
+ statement.execute("INSERT into hive_carbon_table1 SELECT 2, 'RAJU', '2.4',
'RUSSIA', 3600");
+ statement.execute("drop table if exists hive_carbon_table2");
+ statement.execute("CREATE TABLE hive_carbon_table2(id INT, name STRING,
scale DECIMAL, country STRING, salary DOUBLE) stored by
'org.apache.carbondata.hive.CarbonStorageHandler'");
+ statement.execute("INSERT into hive_carbon_table2 SELECT * FROM
hive_carbon_table1");
+ checkAnswer(statement.executeQuery("SELECT * FROM hive_carbon_table2"),
+ connection.createStatement().executeQuery("select * from
hive_carbon_table1"));
+ statement.execute("drop table if exists hive_carbon_table1");
+ statement.execute("drop table if exists hive_carbon_table2");
+ }
+
+ @Test
public void verifyDataAfterLoadUsingSortColumns() throws Exception {
statement.execute("drop table if exists hive_carbon_table5");
statement.execute(