This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 6b6232b [CARBONDATA-3934] Support write transactional table with
presto.
6b6232b is described below
commit 6b6232bd28b6458b662abe9ed29e1bdcd99e56db
Author: akashrn5 <[email protected]>
AuthorDate: Mon Mar 16 22:40:25 2020 +0530
[CARBONDATA-3934] Support write transactional table with presto.
Why is this PR needed?
Currently we have the read support from presto which reads all primitive
and complex types and partition support. Here main bottleneck is, user has to
create table and load the data in spark, so its better if the user can load the
transactional data from presto itself.
What changes were proposed in this PR?
This PR supports writing the trasactional data to the carbon table. This
supports just the insert command, as we cant have load command here. This PR
just supports the insert, there are separate jiras for partition and complex
write support, i will raise separate PRs for the same.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3875
---
docs/prestosql-guide.md | 15 +-
.../hadoop/api/CarbonOutputCommitter.java | 2 +-
.../hadoop/api/CarbonTableOutputFormat.java | 22 +-
.../hive/MapredCarbonOutputCommitter.java | 39 ++-
.../carbondata/hive/MapredCarbonOutputFormat.java | 17 +-
.../carbondata/hive/util/HiveCarbonUtil.java | 3 +-
integration/presto/pom.xml | 19 ++
.../carbondata/presto/impl/CarbonTableConfig.java | 6 +
.../carbondata/presto/CarbonDataFileWriter.java | 201 ++++++++++++++
.../presto/CarbonDataFileWriterFactory.java | 72 +++++
.../presto/CarbonDataHandleResolver.java | 29 ++
.../presto/CarbonDataInsertTableHandle.java | 60 ++++
.../presto/CarbonDataLocationService.java | 62 +++++
.../carbondata/presto/CarbonDataMetaData.java | 151 +++++++++++
.../presto/CarbonDataPageSinkProvider.java | 182 +++++++++++++
.../carbondata/presto/CarbonDataWriterFactory.java | 99 +++++++
.../carbondata/presto/CarbonMetadataFactory.java | 134 +++++++++
.../presto/CarbondataConnectorFactory.java | 32 ++-
.../apache/carbondata/presto/CarbondataModule.java | 6 +-
.../PrestoInsertIntoTableTestCase.scala | 302 +++++++++++++++++++++
.../presto/util/CarbonDataStoreCreator.scala | 165 +++++++----
21 files changed, 1520 insertions(+), 98 deletions(-)
diff --git a/docs/prestosql-guide.md b/docs/prestosql-guide.md
index 617d995..3dac6c7 100644
--- a/docs/prestosql-guide.md
+++ b/docs/prestosql-guide.md
@@ -284,9 +284,20 @@ Now you can use the Presto CLI on the coordinator to query
data sources in the c
### Generate CarbonData file
-Please refer to quick start:
https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md.
-Load data statement in Spark can be used to create carbondata tables. And then
you can easily find the created
+There are two ways to create the carbondata files to query from presto.
+1. Please refer to quick start:
https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md.
+Load data statement in Spark can be used to create carbondata tables and then
you can easily find the created
carbondata files.
+2. Carbondata supports writing the transactional data to table using the
insert command. The folder
+structure will be similar to what we have for spark. Table can be created from
spark, then data can
+be inserted from presto and queried from presto.
+
+Data can be inserted in two ways.
+1. ``` insert into target_carbon_table select values('a,b,c');```
+2. ```insert into target_carbon_table select * from source_table;```
+
+Note: Load Carbon properties are not yet supported. The insert will work with
all the default
+configurations.
### Query carbondata in CLI of presto
* Download presto cli client of version 316 :
https://repo1.maven.org/maven2/io/prestosql/presto-cli/
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index a816894..0943002 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -161,7 +161,7 @@ public class CarbonOutputCommitter extends
FileOutputCommitter {
carbonTable.getCarbonTableIdentifier().getTableId(),
new SegmentFileStore(carbonTable.getTablePath(),
segmentFileName + CarbonTablePath.SEGMENT_EXT));
-
+ newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
CarbonLoaderUtil
.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS,
loadModel.getFactTimeStamp(),
true);
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 1ddd3c6..de8f05a 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -76,17 +76,8 @@ import org.apache.log4j.Logger;
// TODO Move dictionary generator which is coded in spark to MR framework.
public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
ObjectArrayWritable> {
- protected static final String LOAD_MODEL =
"mapreduce.carbontable.load.model";
- private static final String DATABASE_NAME =
"mapreduce.carbontable.databaseName";
- private static final String TABLE_NAME = "mapreduce.carbontable.tableName";
- private static final String TABLE = "mapreduce.carbontable.table";
- private static final String TABLE_PATH = "mapreduce.carbontable.tablepath";
- private static final String INPUT_SCHEMA =
"mapreduce.carbontable.inputschema";
- private static final String TEMP_STORE_LOCATIONS =
"mapreduce.carbontable.tempstore.locations";
- private static final String OVERWRITE_SET =
"mapreduce.carbontable.set.overwrite";
+ public static final String LOAD_MODEL = "mapreduce.carbontable.load.model";
public static final String COMPLEX_DELIMITERS =
"mapreduce.carbontable.complex_delimiters";
- private static final String CARBON_TRANSACTIONAL_TABLE =
- "mapreduce.input.carboninputformat.transactional";
public static final String SERIALIZATION_NULL_FORMAT =
"mapreduce.carbontable.serialization.null.format";
public static final String BAD_RECORDS_LOGGER_ENABLE =
@@ -102,6 +93,17 @@ public class CarbonTableOutputFormat extends
FileOutputFormat<NullWritable, Obje
public static final String BAD_RECORD_PATH =
"mapreduce.carbontable.bad.record.path";
public static final String DATE_FORMAT = "mapreduce.carbontable.date.format";
public static final String TIMESTAMP_FORMAT =
"mapreduce.carbontable.timestamp.format";
+
+ private static final String DATABASE_NAME =
"mapreduce.carbontable.databaseName";
+ private static final String TABLE_NAME = "mapreduce.carbontable.tableName";
+ private static final String TABLE = "mapreduce.carbontable.table";
+ private static final String TABLE_PATH = "mapreduce.carbontable.tablepath";
+ private static final String INPUT_SCHEMA =
"mapreduce.carbontable.inputschema";
+ private static final String TEMP_STORE_LOCATIONS =
"mapreduce.carbontable.tempstore.locations";
+ private static final String OVERWRITE_SET =
"mapreduce.carbontable.set.overwrite";
+ private static final String CARBON_TRANSACTIONAL_TABLE =
+ "mapreduce.input.carboninputformat.transactional";
+
/**
* Set the update timestamp if user sets in case of update query. It needs
to be updated
* in load status update time
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
index 500764b..07297a3 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.api.CarbonOutputCommitter;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
@@ -52,25 +53,41 @@ public class MapredCarbonOutputCommitter extends
OutputCommitter {
@Override
public void setupJob(JobContext jobContext) throws IOException {
-
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
- String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
Random random = new Random();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context =
new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID);
- CarbonLoadModel carbonLoadModel =
- HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
- CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(),
carbonLoadModel);
+ CarbonLoadModel carbonLoadModel = null;
+ String encodedString =
jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL);
+ // The encodedString, which is serialized loadModel will be null in case
of when the data is
+ // written via hive. In that case mapreduce.map.env will be set with load
model, so that, when
+ // the containers are launched, the loadModel is published to all
containers from
+ // mapreduce.map.env.
+ // But In case, when the data is written via presto, since it's not
exactly MR job,we send the
+ // load model from coordinator to worker using
+ //
org.apache.carbondata.presto.impl.CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL
+ if (encodedString != null) {
+ carbonLoadModel =
+ (CarbonLoadModel)
ObjectSerializationUtil.convertStringToObject(encodedString);
+ }
+ if (null == carbonLoadModel) {
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
+ String mapReduceMapTaskEnv =
jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
+ carbonLoadModel =
HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
+ CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(),
carbonLoadModel);
+ String loadModelStr =
jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL);
+ // Set the loadModel string to mapreduce.map.env so that it will be
published to all
+ // containers later during job execution.
+ jobContext.getJobConf()
+ .set(JobConf.MAPRED_MAP_TASK_ENV, mapReduceMapTaskEnv + ",carbon=" +
loadModelStr);
+ jobContext.getJobConf()
+ .set(JobConf.MAPRED_REDUCE_TASK_ENV, mapReduceMapTaskEnv +
",carbon=" + loadModelStr);
+ }
carbonOutputCommitter =
new CarbonOutputCommitter(new Path(carbonLoadModel.getTablePath()),
context);
carbonOutputCommitter.setupJob(jobContext);
- String loadModelStr =
jobContext.getConfiguration().get("mapreduce.carbontable.load.model");
- jobContext.getJobConf().set(JobConf.MAPRED_MAP_TASK_ENV,
- a + ",carbon=" + loadModelStr);
- jobContext.getJobConf().set(JobConf.MAPRED_REDUCE_TASK_ENV,
- a + ",carbon=" + loadModelStr);
}
@Override
@@ -119,4 +136,4 @@ public class MapredCarbonOutputCommitter extends
OutputCommitter {
throw e;
}
}
-}
\ No newline at end of file
+}
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
index 7e1da4d..9499b86 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
@@ -18,7 +18,9 @@
package org.apache.carbondata.hive;
import java.io.IOException;
+import java.text.SimpleDateFormat;
import java.util.Arrays;
+import java.util.Date;
import java.util.Map;
import java.util.Properties;
@@ -43,6 +45,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.Progressable;
@@ -92,8 +95,20 @@ public class MapredCarbonOutputFormat<T> extends
CarbonTableOutputFormat
}
String tablePath =
FileFactory.getCarbonFile(carbonLoadModel.getTablePath()).getAbsolutePath();
TaskAttemptID taskAttemptID =
TaskAttemptID.forName(jc.get("mapred.task.id"));
+ // taskAttemptID will be null when the insert job is fired from presto.
Presto send the JobConf
+ // and since presto does not use the MR framework for execution, the
mapred.task.id will be
+ // null, so prepare a new ID.
+ if (taskAttemptID == null) {
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
+ String jobTrackerId = formatter.format(new Date());
+ taskAttemptID = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0);
+ // update the app name here, as in this class by default it will written
by Hive
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
"presto");
+ } else {
+ carbonLoadModel.setTaskNo("" + taskAttemptID.getTaskID().getId());
+ }
TaskAttemptContextImpl context = new TaskAttemptContextImpl(jc,
taskAttemptID);
- carbonLoadModel.setTaskNo("" + taskAttemptID.getTaskID().getId());
final boolean isHivePartitionedTable =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().isHivePartitionTable();
PartitionInfo partitionInfo =
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
index d17a8a3..1972034 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
@@ -122,11 +122,12 @@ public class HiveCarbonUtil {
CarbonLoadModel loadModel;
CarbonTable carbonTable;
try {
- String schemaFilePath = CarbonTablePath.getSchemaFilePath(location);
+ String schemaFilePath = CarbonTablePath.getSchemaFilePath(location,
configuration);
AbsoluteTableIdentifier absoluteTableIdentifier =
AbsoluteTableIdentifier.from(location, databaseName, tableName, "");
if (FileFactory.getCarbonFile(schemaFilePath).exists()) {
carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+ carbonTable.setTransactionalTable(true);
} else {
String carbonDataFile =
CarbonUtil.getFilePathExternalFilePath(location, configuration);
if (carbonDataFile == null) {
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index a671f3d..507184a 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -539,6 +539,25 @@
<version>0.9.0</version>
</dependency>
<dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-hive</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
diff --git
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index f11d914..dbf9960 100755
---
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -40,6 +40,12 @@ public class CarbonTableConfig {
private String endPoint;
private String pushRowFilter;
+ /**
+ * Property to send load model from coordinator to worker in presto. This is
internal constant
+ * and not exposed to user.
+ */
+ public static final String CARBON_PRESTO_LOAD_MODEL =
"carbondata.presto.encoded.loadmodel";
+
public String getUnsafeMemoryInMb() {
return unsafeMemoryInMb;
}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
new file mode 100644
index 0000000..ee7602e
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+import static
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the page data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+ private final JobConf configuration;
+ private final Path outPutPath;
+ private final FileSinkOperator.RecordWriter recordWriter;
+ private final CarbonHiveSerDe serDe;
+ private final int fieldCount;
+ private final Object row;
+ private final SettableStructObjectInspector tableInspector;
+ private final List<StructField> structFields;
+ private final HiveWriteUtils.FieldSetter[] setters;
+
+ private boolean isCommitDone;
+
+ public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames,
Properties properties,
+ JobConf configuration, TypeManager typeManager) throws SerDeException {
+ requireNonNull(outPutPath, "path is null");
+ // take the outputPath same as location in compliance with the carbon
store folder structure.
+ this.outPutPath = new Path(properties.getProperty("location"));
+ this.configuration = requireNonNull(configuration, "conf is null");
+ List<String> columnNames = Arrays
+ .asList(properties.getProperty(IOConstants.COLUMNS,
"").split(CarbonCommonConstants.COMMA));
+ List<Type> fileColumnTypes =
+ HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES,
"")).stream()
+ .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+ this.fieldCount = columnNames.size();
+ this.serDe = new CarbonHiveSerDe();
+ serDe.initialize(configuration, properties);
+ List<ObjectInspector> objectInspectors = fileColumnTypes.stream()
+ .map(HiveWriteUtils::getRowColumnInspector)
+ .collect(toList());
+ this.tableInspector = getStandardStructObjectInspector(columnNames,
objectInspectors);
+
+ this.structFields =
+
ImmutableList.copyOf(inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+ .collect(toImmutableList()));
+
+ this.row = tableInspector.create();
+
+ this.setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+ for (int i = 0; i < setters.length; i++) {
+ setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row,
structFields.get(i),
+ fileColumnTypes.get(structFields.get(i).getFieldID()));
+ }
+ String encodedLoadModel =
this.configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+ if (StringUtils.isNotEmpty(encodedLoadModel)) {
+ this.configuration.set(CarbonTableOutputFormat.LOAD_MODEL,
encodedLoadModel);
+ }
+ try {
+ boolean compress = HiveConf.getBoolVar(this.configuration,
COMPRESSRESULT);
+ Object writer =
+
Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+ this.recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+ .getHiveRecordWriter(this.configuration, this.outPutPath,
Text.class, compress,
+ properties, Reporter.NULL);
+ } catch (Exception e) {
+ LOG.error("error while initializing writer", e);
+ throw new RuntimeException("writer class not found");
+ }
+ }
+
+ @Override
+ public long getWrittenBytes() {
+ if (isCommitDone) {
+ try {
+ return
outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public long getSystemMemoryUsage() {
+ // TODO: need to support this, Jira CARBONDATA-4038 is created
+ return 0;
+ }
+
+ @Override
+ public void appendRows(Page dataPage) {
+ for (int position = 0; position < dataPage.getPositionCount(); position++)
{
+ appendRow(dataPage, position);
+ }
+ }
+
+ private void appendRow(Page dataPage, int position) {
+ for (int field = 0; field < fieldCount; field++) {
+ Block block = dataPage.getBlock(field);
+ if (block.isNull(position)) {
+ tableInspector.setStructFieldData(row, structFields.get(field), null);
+ } else {
+ setters[field].setField(block, position);
+ }
+ }
+ try {
+ recordWriter.write(serDe.serialize(row, tableInspector));
+ } catch (SerDeException | IOException e) {
+ throw new PrestoException(HIVE_WRITER_DATA_ERROR, e);
+ }
+ }
+
+ @Override
+ public void commit() {
+ try {
+ recordWriter.close(false);
+ } catch (Exception ex) {
+ LOG.error("Error while closing the record writer", ex);
+ throw new RuntimeException(ex);
+ }
+ isCommitDone = true;
+ }
+
+ @Override
+ public void rollback() {
+ try {
+ recordWriter.close(true);
+ } catch (Exception e) {
+ LOG.error("Error while closing the record writer during rollback", e);
+ throw new PrestoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back
write to Hive", e);
+ }
+ }
+
+ @Override
+ public long getValidationCpuNanos() {
+ // TODO: need to support this, Jira CARBONDATA-4038 is created
+ return 0;
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
new file mode 100644
index 0000000..fb0b46f
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.FileFormatDataSourceStats;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.metastore.StorageFormat;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.mapred.JobConf;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonDataFileWriterFactory implements HiveFileWriterFactory {
+
+ private final HdfsEnvironment hdfsEnvironment;
+ private final TypeManager typeManager;
+ private final NodeVersion nodeVersion;
+ private final FileFormatDataSourceStats stats;
+
+ @Inject
+ public CarbonDataFileWriterFactory(HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
+ NodeVersion nodeVersion, FileFormatDataSourceStats stats) {
+ this(typeManager, hdfsEnvironment, nodeVersion, stats);
+ }
+
+ public CarbonDataFileWriterFactory(TypeManager typeManager, HdfsEnvironment
hdfsEnvironment,
+ NodeVersion nodeVersion, FileFormatDataSourceStats stats) {
+ this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is
null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
+ this.stats = requireNonNull(stats, "stats is null");
+ }
+
+ @Override
+ public Optional<HiveFileWriter> createFileWriter(Path path, List<String>
inputColumnNames,
+ StorageFormat storageFormat, Properties schema, JobConf configuration,
+ ConnectorSession session) {
+ try {
+ return Optional
+ .of(new CarbonDataFileWriter(path, inputColumnNames, schema,
configuration, typeManager));
+ } catch (SerDeException e) {
+ throw new RuntimeException("Error while creating carbon file writer", e);
+ }
+ }
+
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataHandleResolver.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataHandleResolver.java
new file mode 100644
index 0000000..ce7232c
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataHandleResolver.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import io.prestosql.plugin.hive.HiveHandleResolver;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+
+public class CarbonDataHandleResolver extends HiveHandleResolver {
+
+ @Override
+ public Class<? extends ConnectorInsertTableHandle>
getInsertTableHandleClass() {
+ return CarbonDataInsertTableHandle.class;
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataInsertTableHandle.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataInsertTableHandle.java
new file mode 100644
index 0000000..f8ebe7e
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataInsertTableHandle.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.plugin.hive.HiveBucketProperty;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveStorageFormat;
+import io.prestosql.plugin.hive.LocationHandle;
+import io.prestosql.plugin.hive.metastore.HivePageSinkMetadata;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonDataInsertTableHandle extends HiveInsertTableHandle
implements
+ ConnectorInsertTableHandle {
+
+ private final Map<String, String> additionalConf;
+
+ @JsonCreator public CarbonDataInsertTableHandle(
+ @JsonProperty("schemaName") String schemaName,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("inputColumns") List<HiveColumnHandle> inputColumns,
+ @JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata,
+ @JsonProperty("locationHandle") LocationHandle locationHandle,
+ @JsonProperty("bucketProperty") Optional<HiveBucketProperty>
bucketProperty,
+ @JsonProperty("tableStorageFormat") HiveStorageFormat tableStorageFormat,
+ @JsonProperty("partitionStorageFormat") HiveStorageFormat
partitionStorageFormat,
+ @JsonProperty("additionalConf") Map<String, String> additionalConf) {
+ super(schemaName, tableName, inputColumns, pageSinkMetadata,
locationHandle, bucketProperty,
+ tableStorageFormat, partitionStorageFormat);
+ this.additionalConf =
+ ImmutableMap.copyOf(requireNonNull(additionalConf, "additionConf Map
is null"));
+ }
+
+ @JsonProperty public Map<String, String> getAdditionalConf() {
+ return additionalConf;
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
new file mode 100644
index 0000000..5532a7c
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveLocationService;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.plugin.hive.LocationHandle;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.spi.connector.ConnectorSession;
+import org.apache.hadoop.fs.Path;
+
+public class CarbonDataLocationService extends HiveLocationService {
+
+ private final HdfsEnvironment hdfsEnvironment;
+
+ @Inject
+ public CarbonDataLocationService(HdfsEnvironment hdfsEnvironment) {
+ super(hdfsEnvironment);
+ this.hdfsEnvironment = hdfsEnvironment;
+ }
+
+ @Override
+ public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore,
+ ConnectorSession session, String schemaName, String tableName) {
+ // TODO: test in cloud scenario in S3/OBS and make it compatible for cloud
scenario
+ super.forNewTable(metastore, session, schemaName, tableName);
+ HdfsEnvironment.HdfsContext context =
+ new HdfsEnvironment.HdfsContext(session, schemaName, tableName);
+ Path targetPath = HiveWriteUtils
+ .getTableDefaultLocation(context, metastore, this.hdfsEnvironment,
schemaName, tableName);
+ return new LocationHandle(targetPath, targetPath, false,
+ LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY);
+ }
+
+ @Override
+ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore
metastore,
+ ConnectorSession session, Table table) {
+ // TODO: test in cloud scenario in S3/OBS and make it compatible for cloud
scenario
+ super.forExistingTable(metastore, session, table);
+ Path targetPath = new Path(table.getStorage().getLocation());
+ return new LocationHandle(targetPath, targetPath, true,
+ LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY);
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
new file mode 100644
index 0000000..42774f5
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.MetastoreUtil;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.plugin.hive.security.AccessControlMetadata;
+import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
+import io.prestosql.plugin.hive.util.ConfigurationUtils;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorOutputMetadata;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.statistics.ComputedStatistics;
+import io.prestosql.spi.type.TypeManager;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+
+public class CarbonDataMetaData extends HiveMetadata {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataMetaData.class.getName());
+
+ private HdfsEnvironment hdfsEnvironment;
+ private SemiTransactionalHiveMetastore metastore;
+ private MapredCarbonOutputCommitter carbonOutputCommitter;
+ private JobContextImpl jobContext;
+
+ public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
+ HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
DateTimeZone timeZone,
+ boolean allowCorruptWritesForTesting, boolean
writesToNonManagedTablesEnabled,
+ boolean createsOfNonManagedTablesEnabled, TypeManager typeManager,
+ LocationService locationService,
+ io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec,
+ TypeTranslator typeTranslator, String prestoVersion,
+ HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata
accessControlMetadata) {
+ super(metastore, hdfsEnvironment, partitionManager, timeZone,
allowCorruptWritesForTesting,
+ true, createsOfNonManagedTablesEnabled, typeManager,
+ locationService, partitionUpdateCodec, typeTranslator, prestoVersion,
+ hiveStatisticsProvider, accessControlMetadata);
+ this.hdfsEnvironment = hdfsEnvironment;
+ this.metastore = metastore;
+ }
+
+ @Override
+ public CarbonDataInsertTableHandle beginInsert(ConnectorSession session,
+ ConnectorTableHandle tableHandle) {
+ HiveInsertTableHandle hiveInsertTableHandle = super.beginInsert(session,
tableHandle);
+ SchemaTableName tableName = hiveInsertTableHandle.getSchemaTableName();
+ Optional<Table> table =
+ this.metastore.getTable(tableName.getSchemaName(),
tableName.getTableName());
+ Path outputPath =
+ new
Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableTargetPath());
+ JobConf jobConf = ConfigurationUtils.toJobConf(this.hdfsEnvironment
+ .getConfiguration(
+ new HdfsEnvironment.HdfsContext(session,
hiveInsertTableHandle.getSchemaName(),
+ hiveInsertTableHandle.getTableName()),
+ new
Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableWritePath())));
+ jobConf.set("location", outputPath.toString());
+ Properties hiveSchema = MetastoreUtil.getHiveSchema(table.get());
+ try {
+ CarbonLoadModel carbonLoadModel =
+ HiveCarbonUtil.getCarbonLoadModel(hiveSchema, jobConf);
+
+ CarbonTableOutputFormat.setLoadModel(jobConf, carbonLoadModel);
+ } catch (IOException ex) {
+ LOG.error("Error while creating carbon load model", ex);
+ throw new RuntimeException(ex);
+ }
+ try {
+ carbonOutputCommitter = new MapredCarbonOutputCommitter();
+ jobContext = new JobContextImpl(jobConf, new JobID());
+ carbonOutputCommitter.setupJob(jobContext);
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobConf);
+ } catch (IOException e) {
+ LOG.error("error setting the output committer", e);
+ throw new RuntimeException("error setting the output committer");
+ }
+ return new
CarbonDataInsertTableHandle(hiveInsertTableHandle.getSchemaTableName().getSchemaName(),
+ hiveInsertTableHandle.getTableName(),
+ hiveInsertTableHandle.getInputColumns(),
+ hiveInsertTableHandle.getPageSinkMetadata(),
+ hiveInsertTableHandle.getLocationHandle(),
+ hiveInsertTableHandle.getBucketProperty(),
hiveInsertTableHandle.getTableStorageFormat(),
+ hiveInsertTableHandle.getPartitionStorageFormat(),
+ ImmutableMap.of(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL,
+
jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL)));
+ }
+
+ @Override
+ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession
session,
+ ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ Optional<ConnectorOutputMetadata> connectorOutputMetadata =
+ super.finishInsert(session, insertHandle, fragments,
computedStatistics);
+ try {
+ carbonOutputCommitter.commitJob(jobContext);
+ } catch (IOException e) {
+ LOG.error("Error occurred while committing the insert job.", e);
+ throw new RuntimeException(e);
+ }
+ return connectorOutputMetadata;
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
new file mode 100644
index 0000000..9747017
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.Inject;
+import io.airlift.event.client.EventClient;
+import io.airlift.json.JsonCodec;
+import io.airlift.units.DataSize;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.HivePageSink;
+import io.prestosql.plugin.hive.HivePageSinkProvider;
+import io.prestosql.plugin.hive.HiveSessionProperties;
+import io.prestosql.plugin.hive.HiveWritableTableHandle;
+import io.prestosql.plugin.hive.HiveWriterStats;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.OrcFileWriterFactory;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider;
+import io.prestosql.plugin.hive.metastore.SortingColumn;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.PageIndexerFactory;
+import io.prestosql.spi.PageSorter;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorPageSink;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.type.TypeManager;
+
+import static
com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+import static io.airlift.concurrent.Threads.daemonThreadsNamed;
+import static
io.prestosql.plugin.hive.metastore.CachingHiveMetastore.memoizeMetastore;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+public class CarbonDataPageSinkProvider extends HivePageSinkProvider {
+
+ private final Set<HiveFileWriterFactory> fileWriterFactories;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final PageSorter pageSorter;
+ private final HiveMetastore metastore;
+ private final PageIndexerFactory pageIndexerFactory;
+ private final TypeManager typeManager;
+ private final int maxOpenPartitions;
+ private final int maxOpenSortFiles;
+ private final DataSize writerSortBufferSize;
+ private final boolean immutablePartitions;
+ private final LocationService locationService;
+ private final ListeningExecutorService writeVerificationExecutor;
+ private final JsonCodec<PartitionUpdate> partitionUpdateCodec;
+ private final NodeManager nodeManager;
+ private final EventClient eventClient;
+ private final HiveSessionProperties hiveSessionProperties;
+ private final HiveWriterStats hiveWriterStats;
+ private final OrcFileWriterFactory orcFileWriterFactory;
+ private final long perTransactionMetastoreCacheMaximumSize;
+
+ @Inject
+ public CarbonDataPageSinkProvider(Set<HiveFileWriterFactory>
fileWriterFactories,
+ HdfsEnvironment hdfsEnvironment, PageSorter pageSorter, HiveMetastore
metastore,
+ PageIndexerFactory pageIndexerFactory, TypeManager typeManager,
HiveConfig config,
+ LocationService locationService, JsonCodec<PartitionUpdate>
partitionUpdateCodec,
+ NodeManager nodeManager, EventClient eventClient, HiveSessionProperties
hiveSessionProperties,
+ HiveWriterStats hiveWriterStats, OrcFileWriterFactory
orcFileWriterFactory) {
+ super(fileWriterFactories, hdfsEnvironment, pageSorter, metastore,
pageIndexerFactory,
+ typeManager, config, locationService, partitionUpdateCodec,
nodeManager, eventClient,
+ hiveSessionProperties, hiveWriterStats, orcFileWriterFactory);
+ this.fileWriterFactories =
+ ImmutableSet.copyOf(requireNonNull(fileWriterFactories,
"fileWriterFactories is null"));
+ this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is
null");
+ this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
+ this.metastore = requireNonNull(metastore, "metastore is null");
+ this.pageIndexerFactory = requireNonNull(pageIndexerFactory,
"pageIndexerFactory is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.maxOpenPartitions = config.getMaxPartitionsPerWriter();
+ this.maxOpenSortFiles = config.getMaxOpenSortFiles();
+ this.writerSortBufferSize =
+ requireNonNull(config.getWriterSortBufferSize(), "writerSortBufferSize
is null");
+ this.immutablePartitions = config.isImmutablePartitions();
+ this.locationService = requireNonNull(locationService, "locationService is
null");
+ this.writeVerificationExecutor = listeningDecorator(
+ newFixedThreadPool(config.getWriteValidationThreads(),
+ daemonThreadsNamed("hive-write-validation-%s")));
+ this.partitionUpdateCodec =
+ requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
+ this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
+ this.eventClient = requireNonNull(eventClient, "eventClient is null");
+ this.hiveSessionProperties =
+ requireNonNull(hiveSessionProperties, "hiveSessionProperties is null");
+ this.hiveWriterStats = requireNonNull(hiveWriterStats, "stats is null");
+ this.orcFileWriterFactory =
+ requireNonNull(orcFileWriterFactory, "orcFileWriterFactory is null");
+ this.perTransactionMetastoreCacheMaximumSize =
+ config.getPerTransactionMetastoreCacheMaximumSize();
+ }
+
+ @Override public ConnectorPageSink createPageSink(ConnectorTransactionHandle
transaction,
+ ConnectorSession session, ConnectorInsertTableHandle tableHandle) {
+ CarbonDataInsertTableHandle handle = (CarbonDataInsertTableHandle)
tableHandle;
+ return createPageSink(handle, session, ImmutableMap.of(),
handle.getAdditionalConf(), false);
+ }
+
+ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle,
ConnectorSession session,
+ Map<String, String> additionalTableParameters, Map<String, String>
additionalConf,
+ boolean isCreateTable) {
+ OptionalInt bucketCount = OptionalInt.empty();
+ List<SortingColumn> sortedBy = ImmutableList.of();
+
+ if (handle.getBucketProperty().isPresent()) {
+ bucketCount =
OptionalInt.of(handle.getBucketProperty().get().getBucketCount());
+ sortedBy = handle.getBucketProperty().get().getSortedBy();
+ }
+ CarbonDataWriterFactory carbonDataWriterFactory = new
CarbonDataWriterFactory(
+ fileWriterFactories,
+ handle.getSchemaName(),
+ handle.getTableName(),
+ isCreateTable,
+ handle.getInputColumns(),
+ handle.getTableStorageFormat(),
+ handle.getPartitionStorageFormat(),
+ additionalTableParameters,
+ bucketCount,
+ sortedBy,
+ handle.getLocationHandle(),
+ locationService,
+ session.getQueryId(),
+ new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(),
+ memoizeMetastore(metastore,
perTransactionMetastoreCacheMaximumSize)),
+ typeManager,
+ hdfsEnvironment,
+ pageSorter,
+ writerSortBufferSize,
+ maxOpenSortFiles,
+ immutablePartitions,
+ session,
+ nodeManager,
+ eventClient,
+ hiveSessionProperties,
+ hiveWriterStats,
+ orcFileWriterFactory,
+ additionalConf
+ );
+
+ return new HivePageSink(
+ carbonDataWriterFactory,
+ handle.getInputColumns(),
+ handle.getBucketProperty(),
+ pageIndexerFactory,
+ typeManager,
+ hdfsEnvironment,
+ maxOpenPartitions,
+ writeVerificationExecutor,
+ partitionUpdateCodec,
+ session);
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataWriterFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataWriterFactory.java
new file mode 100644
index 0000000..57e3dbf
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataWriterFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import io.airlift.event.client.EventClient;
+import io.airlift.units.DataSize;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.HiveSessionProperties;
+import io.prestosql.plugin.hive.HiveStorageFormat;
+import io.prestosql.plugin.hive.HiveWriter;
+import io.prestosql.plugin.hive.HiveWriterFactory;
+import io.prestosql.plugin.hive.HiveWriterStats;
+import io.prestosql.plugin.hive.LocationHandle;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.OrcFileWriterFactory;
+import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider;
+import io.prestosql.plugin.hive.metastore.SortingColumn;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PageSorter;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.hadoop.mapred.JobConf;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonDataWriterFactory extends HiveWriterFactory {
+
+ private final Map<String, String> additionalJobConf;
+
+ public CarbonDataWriterFactory(Set<HiveFileWriterFactory>
fileWriterFactories, String schemaName,
+ String tableName, boolean isCreateTable, List<HiveColumnHandle>
inputColumns,
+ HiveStorageFormat tableStorageFormat, HiveStorageFormat
partitionStorageFormat,
+ Map<String, String> additionalTableParameters, OptionalInt bucketCount,
+ List<SortingColumn> sortedBy, LocationHandle locationHandle,
LocationService locationService,
+ String queryId, HivePageSinkMetadataProvider pageSinkMetadataProvider,
+ TypeManager typeManager, HdfsEnvironment hdfsEnvironment, PageSorter
pageSorter,
+ DataSize sortBufferSize, int maxOpenSortFiles, boolean
immutablePartitions,
+ ConnectorSession session, NodeManager nodeManager, EventClient
eventClient,
+ HiveSessionProperties hiveSessionProperties, HiveWriterStats
hiveWriterStats,
+ OrcFileWriterFactory orcFileWriterFactory, Map<String, String>
additionalJobConf) {
+ super(fileWriterFactories, schemaName, tableName, isCreateTable,
inputColumns,
+ tableStorageFormat, partitionStorageFormat, additionalTableParameters,
bucketCount,
+ sortedBy, locationHandle, locationService, queryId,
pageSinkMetadataProvider, typeManager,
+ hdfsEnvironment, pageSorter, sortBufferSize, maxOpenSortFiles,
immutablePartitions, session,
+ nodeManager, eventClient, hiveSessionProperties, hiveWriterStats,
orcFileWriterFactory);
+
+ this.additionalJobConf = requireNonNull(additionalJobConf, "Additional
jobConf is null");
+ }
+
+ @Override
+ public HiveWriter createWriter(Page partitionColumns, int position,
OptionalInt bucketNumber) {
+ // set the additional conf like loadModel to send to worker
+ JobConf jobConf = getSuperJobConf();
+ additionalJobConf.forEach((k, v) -> jobConf.set(k, v));
+ return super.createWriter(partitionColumns, position, bucketNumber);
+ }
+
+ private JobConf getSuperJobConf() {
+ Object value;
+ try {
+ Field field = HiveWriterFactory.class.getDeclaredField("conf");
+ field.setAccessible(true);
+ value = field.get(this);
+ field.setAccessible(false);
+
+ if (value == null) {
+ return null;
+ } else if (JobConf.class.isAssignableFrom(value.getClass())) {
+ return (JobConf) value;
+ }
+ } catch (NoSuchFieldException | IllegalAccessException ex) {
+ throw new RuntimeException("JobConf field is not found");
+ }
+ return (JobConf) value;
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
new file mode 100644
index 0000000..8ab5138
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import com.google.inject.Inject;
+import io.airlift.concurrent.BoundedExecutor;
+import io.airlift.json.JsonCodec;
+import io.airlift.log.Logger;
+import io.prestosql.plugin.hive.ForHive;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HiveMetadataFactory;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.CachingHiveMetastore;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.security.AccessControlMetadataFactory;
+import io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider;
+import io.prestosql.spi.type.TypeManager;
+import org.joda.time.DateTimeZone;
+
+public class CarbonMetadataFactory extends HiveMetadataFactory {
+
+ private static final Logger log = Logger.get(HiveMetadataFactory.class);
+ private final boolean allowCorruptWritesForTesting;
+ private final boolean skipDeletionForAlter;
+ private final boolean skipTargetCleanupOnRollback;
+ private final boolean writesToNonManagedTablesEnabled = true;
+ private final boolean createsOfNonManagedTablesEnabled;
+ private final long perTransactionCacheMaximumSize;
+ private final HiveMetastore metastore;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final HivePartitionManager partitionManager;
+ private final DateTimeZone timeZone;
+ private final TypeManager typeManager;
+ private final LocationService locationService;
+ private final BoundedExecutor renameExecution;
+ private final TypeTranslator typeTranslator;
+ private final String prestoVersion;
+ private final AccessControlMetadataFactory accessControlMetadataFactory;
+ private final JsonCodec partitionUpdateCodec;
+
+ @Inject public CarbonMetadataFactory(HiveConfig hiveConfig, HiveMetastore
metastore,
+ HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
+ @ForHive ExecutorService executorService, TypeManager typeManager,
+ LocationService locationService, JsonCodec<PartitionUpdate>
partitionUpdateCodec,
+ TypeTranslator typeTranslator, NodeVersion nodeVersion,
+ AccessControlMetadataFactory accessControlMetadataFactory) {
+ this(metastore, hdfsEnvironment, partitionManager,
hiveConfig.getDateTimeZone(),
+ hiveConfig.getMaxConcurrentFileRenames(),
hiveConfig.getAllowCorruptWritesForTesting(),
+ hiveConfig.isSkipDeletionForAlter(),
hiveConfig.isSkipTargetCleanupOnRollback(),
+ hiveConfig.getWritesToNonManagedTablesEnabled(),
+ hiveConfig.getCreatesOfNonManagedTablesEnabled(),
+ hiveConfig.getPerTransactionMetastoreCacheMaximumSize(), typeManager,
locationService,
+ partitionUpdateCodec, executorService, typeTranslator,
nodeVersion.toString(),
+ accessControlMetadataFactory);
+ }
+
+ public CarbonMetadataFactory(HiveMetastore metastore, HdfsEnvironment
hdfsEnvironment,
+ HivePartitionManager partitionManager, DateTimeZone timeZone, int
maxConcurrentFileRenames,
+ boolean allowCorruptWritesForTesting, boolean skipDeletionForAlter,
+ boolean skipTargetCleanupOnRollback, boolean
writesToNonManagedTablesEnabled,
+ boolean createsOfNonManagedTablesEnabled, long
perTransactionCacheMaximumSize,
+ TypeManager typeManager, LocationService locationService,
+ io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec,
+ ExecutorService executorService, TypeTranslator typeTranslator, String
prestoVersion,
+ AccessControlMetadataFactory accessControlMetadataFactory) {
+ super(metastore, hdfsEnvironment, partitionManager, timeZone,
maxConcurrentFileRenames,
+ allowCorruptWritesForTesting, skipDeletionForAlter,
skipTargetCleanupOnRollback,
+ true, createsOfNonManagedTablesEnabled,
+ perTransactionCacheMaximumSize, typeManager, locationService,
partitionUpdateCodec,
+ executorService, typeTranslator, prestoVersion,
accessControlMetadataFactory);
+ this.allowCorruptWritesForTesting = allowCorruptWritesForTesting;
+ this.skipDeletionForAlter = skipDeletionForAlter;
+ this.skipTargetCleanupOnRollback = skipTargetCleanupOnRollback;
+ this.createsOfNonManagedTablesEnabled = createsOfNonManagedTablesEnabled;
+ this.perTransactionCacheMaximumSize = perTransactionCacheMaximumSize;
+ this.metastore = Objects.requireNonNull(metastore, "metastore is null");
+ this.hdfsEnvironment = Objects.requireNonNull(hdfsEnvironment,
"hdfsEnvironment is null");
+ this.partitionManager = Objects.requireNonNull(partitionManager,
"partitionManager is null");
+ this.timeZone = Objects.requireNonNull(timeZone, "timeZone is null");
+ this.typeManager = Objects.requireNonNull(typeManager, "typeManager is
null");
+ this.locationService = Objects.requireNonNull(locationService,
"locationService is null");
+ this.partitionUpdateCodec =
+ Objects.requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is
null");
+ this.typeTranslator = Objects.requireNonNull(typeTranslator,
"typeTranslator is null");
+ this.prestoVersion = Objects.requireNonNull(prestoVersion, "prestoVersion
is null");
+ this.accessControlMetadataFactory = Objects
+ .requireNonNull(accessControlMetadataFactory,
"accessControlMetadataFactory is null");
+ if (!allowCorruptWritesForTesting &&
!timeZone.equals(DateTimeZone.getDefault())) {
+ log.warn(
+ "Hive writes are disabled. To write data to Hive, your JVM timezone
must match the Hive storage timezone. Add -Duser.timezone=%s to your JVM
arguments",
+ timeZone.getID());
+ }
+
+ this.renameExecution = new BoundedExecutor(executorService,
maxConcurrentFileRenames);
+ }
+
+ @Override public HiveMetadata get() {
+ SemiTransactionalHiveMetastore metastore =
+ new SemiTransactionalHiveMetastore(this.hdfsEnvironment,
CachingHiveMetastore
+ .memoizeMetastore(this.metastore,
this.perTransactionCacheMaximumSize),
+ this.renameExecution, this.skipDeletionForAlter,
this.skipTargetCleanupOnRollback);
+ return new CarbonDataMetaData(metastore, this.hdfsEnvironment,
this.partitionManager,
+ this.timeZone, this.allowCorruptWritesForTesting,
this.writesToNonManagedTablesEnabled,
+ this.createsOfNonManagedTablesEnabled, this.typeManager,
this.locationService,
+ this.partitionUpdateCodec, this.typeTranslator, this.prestoVersion,
+ new MetastoreHiveStatisticsProvider(metastore),
+ this.accessControlMetadataFactory.create(metastore));
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 0f5a61f..ce301cd 100755
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -26,6 +26,9 @@ import static java.util.Objects.requireNonNull;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonInputFormat;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
import org.apache.carbondata.presto.impl.CarbonTableConfig;
import com.google.common.collect.ImmutableSet;
@@ -65,6 +68,7 @@ import io.prestosql.spi.classloader.ThreadContextClassLoader;
import io.prestosql.spi.connector.Connector;
import io.prestosql.spi.connector.ConnectorAccessControl;
import io.prestosql.spi.connector.ConnectorContext;
+import io.prestosql.spi.connector.ConnectorHandleResolver;
import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
@@ -177,15 +181,16 @@ public class CarbondataConnectorFactory extends
HiveConnectorFactory {
/**
* Set the Carbon format enum to HiveStorageFormat, its a hack but for time
being it is best
* choice to avoid lot of code change.
- *
- * @throws Exception
*/
private static void setCarbonEnum() throws Exception {
- for (HiveStorageFormat format : HiveStorageFormat.values()) {
- if (format.name().equals("CARBON")) {
- return;
- }
- }
+ addHiveStorageFormatsForCarbondata("CARBON");
+ addHiveStorageFormatsForCarbondata("ORG.APACHE.CARBONDATA.FORMAT");
+ addHiveStorageFormatsForCarbondata("CARBONDATA");
+ }
+
+ private static void addHiveStorageFormatsForCarbondata(String storedAs)
+ throws InstantiationException, InvocationTargetException,
NoSuchFieldException,
+ IllegalAccessException, NoSuchMethodException {
Constructor<?>[] declaredConstructors =
HiveStorageFormat.class.getDeclaredConstructors();
declaredConstructors[0].setAccessible(true);
Field constructorAccessorField =
Constructor.class.getDeclaredField("constructorAccessor");
@@ -198,9 +203,9 @@ public class CarbondataConnectorFactory extends
HiveConnectorFactory {
acquireConstructorAccessorMethod.setAccessible(true);
ca = (ConstructorAccessor)
acquireConstructorAccessorMethod.invoke(declaredConstructors[0]);
}
- Object instance = ca.newInstance(new Object[] { "CARBON",
HiveStorageFormat.values().length, "",
- CarbonTableInputFormat.class.getName(),
CarbonTableOutputFormat.class.getName(),
- new DataSize(256.0D, DataSize.Unit.MEGABYTE) });
+ Object instance = ca.newInstance(new Object[] { storedAs,
HiveStorageFormat.values().length,
+ CarbonHiveSerDe.class.getName(),
MapredCarbonInputFormat.class.getName(),
+ MapredCarbonOutputFormat.class.getName(), new DataSize(256.0D,
DataSize.Unit.MEGABYTE) });
Field values = HiveStorageFormat.class.getDeclaredField("$VALUES");
values.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
@@ -214,4 +219,9 @@ public class CarbondataConnectorFactory extends
HiveConnectorFactory {
hiveStorageFormats[src.length] = (HiveStorageFormat) instance;
values.set(null, hiveStorageFormats);
}
-}
\ No newline at end of file
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver() {
+ return new CarbonDataHandleResolver();
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
index b7128da..0b2cfa2 100755
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
@@ -127,7 +127,8 @@ public class CarbondataModule extends HiveModule {
.in(Scopes.SINGLETON);
binder.bind(HivePartitionManager.class).in(Scopes.SINGLETON);
binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON);
- binder.bind(HiveMetadataFactory.class).in(Scopes.SINGLETON);
+
binder.bind(HiveLocationService.class).to(CarbonDataLocationService.class).in(Scopes.SINGLETON);
+
binder.bind(HiveMetadataFactory.class).to(CarbonMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(new TypeLiteral<Supplier<TransactionalMetadata>>() {
}).to(HiveMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
@@ -136,7 +137,7 @@ public class CarbondataModule extends HiveModule {
.as(generatedNameOf(HiveSplitManager.class, connectorId));
binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
.in(Scopes.SINGLETON);
- binder.bind(ConnectorPageSinkProvider.class).to(HivePageSinkProvider.class)
+
binder.bind(ConnectorPageSinkProvider.class).to(CarbonDataPageSinkProvider.class)
.in(Scopes.SINGLETON);
binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class)
.in(Scopes.SINGLETON);
@@ -161,6 +162,7 @@ public class CarbondataModule extends HiveModule {
configBinder(binder).bindConfig(OrcFileWriterConfig.class);
fileWriterFactoryBinder.addBinding().to(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
fileWriterFactoryBinder.addBinding().to(RcFileFileWriterFactory.class).in(Scopes.SINGLETON);
+
fileWriterFactoryBinder.addBinding().to(CarbonDataFileWriterFactory.class).in(Scopes.SINGLETON);
binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(ParquetFileWriterConfig.class);
diff --git
a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
new file mode 100644
index 0000000..5c5a1b7
--- /dev/null
+++
b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.integrationtest
+
+import java.io.File
+import java.util
+import java.util.UUID
+import java.util.concurrent.{Callable, Executors, Future}
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table.TableSchema
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.presto.server.PrestoServer
+import org.apache.carbondata.presto.util.CarbonDataStoreCreator
+
+class PrestoInsertIntoTableTestCase
+ extends FunSuiteLike with BeforeAndAfterAll with BeforeAndAfterEach {
+
+ private val logger = LogServiceFactory
+ .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName)
+
+ private val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ private val storePath = s"$rootPath/integration/presto/target/store"
+ private val prestoServer = new PrestoServer
+ private val executorService = Executors.newFixedThreadPool(1)
+
+ override def beforeAll: Unit = {
+ val map = new util.HashMap[String, String]()
+ map.put("hive.metastore", "file")
+ map.put("hive.metastore.catalog.dir", s"file://$storePath")
+ map.put("hive.allow-drop-table", "true")
+ prestoServer.startServer("testdb", map)
+ prestoServer.execute("drop schema if exists testdb")
+ prestoServer.execute("create schema testdb")
+ }
+
+ override protected def beforeEach(): Unit = {
+ val query = "create table testdb.testtable(ID int, date date, country
varchar, name varchar, " +
+ "phonetype varchar, serialname varchar,salary decimal(6,1),
bonus decimal(8,6), " +
+ "monthlyBonus decimal(5,3), dob timestamp, shortField
smallint, iscurrentemployee" +
+ " boolean) with(format='CARBONDATA') "
+ val schema =
CarbonDataStoreCreator.getCarbonTableSchemaForDecimal(getAbsoluteIdentifier(
+ "testdb",
+ "testtable"))
+ createTable(query, "testdb", "testtable", schema)
+ }
+
+ private def createTable(query: String,
+ databaseName: String,
+ tableName: String,
+ customSchema: TableSchema = null): Unit = {
+ prestoServer.execute(s"drop table if exists ${ databaseName }.${ tableName
}")
+ prestoServer.execute(query)
+ logger.info("Creating The Carbon Store")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier(databaseName,
+ tableName)
+ val schema = if (customSchema == null) {
+ CarbonDataStoreCreator.getCarbonTableSchema(absoluteTableIdentifier)
+ } else {
+ customSchema
+ }
+ CarbonDataStoreCreator.createTable(absoluteTableIdentifier,
+ schema, true)
+ logger.info(s"\nCarbon store is created at location: $storePath")
+ }
+
+ private def getAbsoluteIdentifier(dbName: String,
+ tableName: String) = {
+ val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+ storePath + "/" + dbName + "/" + tableName,
+ new CarbonTableIdentifier(dbName,
+ tableName,
+ UUID.randomUUID().toString))
+ absoluteTableIdentifier
+ }
+
+ test("test insert with different storage format names") {
+ val query1 = "create table testdb.testtable(ID int, date date, country
varchar, name varchar," +
+ " phonetype varchar, serialname varchar,salary decimal(6,1),
bonus decimal(8,6)," +
+ " monthlyBonus decimal(5,3), dob timestamp, shortField
smallint, " +
+ "iscurrentemployee boolean) with(format='CARBONDATA') "
+ val query2 = "create table testdb.testtable(ID int, date date, country
varchar, name varchar," +
+ " phonetype varchar, serialname varchar,salary decimal(6,1),
bonus decimal(8,6)," +
+ " monthlyBonus decimal(5,3), dob timestamp, shortField
smallint, " +
+ "iscurrentemployee boolean) with(format='CARBON') "
+ val query3 = "create table testdb.testtable(ID int, date date, country
varchar, name varchar," +
+ " phonetype varchar, serialname varchar,salary decimal(6,1),
bonus decimal(8,6)," +
+ " monthlyBonus decimal(5,3), dob timestamp, shortField
smallint, " +
+ "iscurrentemployee boolean)
with(format='ORG.APACHE.CARBONDATA.FORMAT') "
+ createTable(query1, "testdb", "testtable")
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ createTable(query2, "testdb", "testtable")
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ createTable(query3, "testdb", "testtable")
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier("testdb",
+ "testtable")
+ val carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+ val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath,
"0")
+ assert(FileFactory.getCarbonFile(segmentPath).isFileExist)
+ }
+
+ test("test insert into one segment and check folder structure") {
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier("testdb",
+ "testtable")
+ val carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+ val tablePath = carbonTable.getTablePath
+ val segment0Path = CarbonTablePath.getSegmentPath(tablePath, "0")
+ val segment1Path = CarbonTablePath.getSegmentPath(tablePath, "1")
+ val segment0 = FileFactory.getCarbonFile(segment0Path)
+ assert(segment0.isFileExist)
+ assert(segment0.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) ||
+ file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+ }
+ }).length == 2)
+ val segment1 = FileFactory.getCarbonFile(segment1Path)
+ assert(segment1.isFileExist)
+ assert(segment1.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) ||
+ file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+ }
+ }).length == 2)
+ val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath)
+ assert(FileFactory.getCarbonFile(segmentsPath).isFileExist &&
+ FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2)
+ val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath)
+ FileFactory.getCarbonFile(metadataFolderPath).listFiles(new
CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.TABLE_STATUS_FILE)
+ }
+ })
+ }
+
+ test("test insert into many segments and check segment count and data
count") {
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16
10:12:09',smallint '23', true)")
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16
10:12:09',smallint '23', true)")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier("testdb",
+ "testtable")
+ val carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+ val segmentFoldersLocation =
CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
+
assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size()
== 8)
+ val actualResult1: List[Map[String, Any]] = prestoServer
+ .executeQuery("select count(*) AS RESULT from testdb.testtable")
+ val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 4))
+ assert(actualResult1.equals(expectedResult1))
+ // filter query
+ val actualResult2: List[Map[String, Any]] = prestoServer
+ .executeQuery(
+ "select count(*) AS RESULT from testdb.testtable WHERE dob = timestamp
'1998-12-16 " +
+ "10:12:09'")
+ val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 2))
+ assert(actualResult2.equals(expectedResult2))
+ }
+
+ test("test if the table status contains the segment file name for each
load") {
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier("testdb",
+ "testtable")
+ val carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+ val ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+ ssm.getValidAndInvalidSegments.getValidSegments.asScala.foreach { segment
=>
+ val loadMetadataDetails = segment.getLoadMetadataDetails
+ assert(loadMetadataDetails.getSegmentFile != null)
+ }
+ }
+
+ test("test for query when insert in progress") {
+ prestoServer.execute(
+ "insert into testdb.testtable values(10, current_date, 'INDIA',
'Chandler', 'qwerty', " +
+ "'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14
05:00:09',smallint '23', true)")
+ val query = "insert into testdb.testtable values(10, current_date,
'INDIA', 'Chandler', " +
+ "'qwerty', 'usn20392',10000.0,16.234567,25.678,timestamp
'1994-06-14 05:00:09'," +
+ "smallint '23', true)"
+ val asyncQuery = runSqlAsync(query)
+ val actualResult1: List[Map[String, Any]] = prestoServer.executeQuery(
+ "select count(*) AS RESULT from testdb.testtable WHERE dob = timestamp
'1994-06-14 05:00:09'")
+ val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 1))
+ assert(actualResult1.equals(expectedResult1))
+ assert(asyncQuery.get().equalsIgnoreCase("PASS"))
+ val actualResult2: List[Map[String, Any]] = prestoServer.executeQuery(
+ "select count(*) AS RESULT from testdb.testtable WHERE dob = timestamp
'1994-06-14 05:00:09'")
+ val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 2))
+ assert(actualResult2.equals(expectedResult2))
+ }
+
+ test("test for all primitive data") {
+ val query = "create table testdb.testtablealldatatype(ID int, date
date,name varchar, " +
+ "salary decimal(6,1), bonus decimal(8,6), charfield CHAR(10),"
+
+ "monthlyBonus decimal(5,3),dob timestamp, shortField smallint,
finalSalary " +
+ "double, bigintfield bigint,tinyfield tinyint,
iscurrentemployee" +
+ " boolean) with(format='CARBONDATA') "
+ val schema =
CarbonDataStoreCreator.getCarbonTableSchemaForAllPrimitive(getAbsoluteIdentifier(
+ "testdb",
+ "testtablealldatatype"))
+ createTable(query, "testdb", "testtablealldatatype", schema)
+ prestoServer.execute(
+ "insert into testdb.testtablealldatatype values(10,date '2020-10-21',
'Chandler',1000.0, " +
+ "16.234567,'test_str_0',25.678,timestamp '2019-03-10
18:23:37.0',smallint '-999'," +
+ "200499.500000,999,tinyint '103',false)")
+ val actualResult = prestoServer.executeQuery(
+ "select
ID,date,name,salary,bonus,charfield,monthlyBonus,dob,shortfield,finalSalary," +
+ "bigintfield,tinyfield,iscurrentemployee AS RESULT from
testdb.testtablealldatatype")
+ val actualResultString =
actualResult.head.values.map(_.toString).toList.sorted
+ val expectedResult2: List[String] = List("-999",
+ "10",
+ "1000.0",
+ "103",
+ "16.234567",
+ "200499.5",
+ "2019-03-10 18:23:37.0",
+ "2020-10-21",
+ "25.678",
+ "999",
+ "Chandler",
+ "false",
+ "test_str_0")
+ assert(actualResultString.equals(expectedResult2))
+ }
+
+ class QueryTask(query: String) extends Callable[String] {
+ override def call(): String = {
+ var result = "PASS"
+ try {
+ prestoServer.execute(query)
+ } catch {
+ case ex: Exception =>
+ // scalastyle:off
+ println(ex.printStackTrace())
+ // scalastyle:on
+ result = "FAIL"
+ }
+ result
+ }
+ }
+
+ private def runSqlAsync(sql: String): Future[String] = {
+ val future = executorService.submit(
+ new QueryTask(sql)
+ )
+ Thread.sleep(2)
+ future
+ }
+
+ override def afterAll(): Unit = {
+ prestoServer.stopServer()
+ CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(storePath))
+ executorService.shutdownNow()
+ }
+}
diff --git
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 4b91c8f..75ba584 100644
---
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -24,9 +24,6 @@ import java.util
import java.util.{ArrayList, Date, UUID}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
import com.google.gson.Gson
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -43,13 +40,12 @@ import
org.apache.carbondata.core.fileoperations.{AtomicFileOperationFactory, At
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.{SchemaConverter,
ThriftWrapperSchemaConverterImpl}
import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
CarbonTableBuilder, TableSchemaBuilder}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
CarbonTableBuilder, TableSchema, TableSchemaBuilder}
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.ThriftWriter
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.processing.loading.DataLoadExecutor
import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails,
CSVInputFormat, CSVRecordReaderIterator, StringArrayWritable}
@@ -75,7 +71,9 @@ object CarbonDataStoreCreator {
new CarbonTableIdentifier(dbName,
tableName,
UUID.randomUUID().toString))
- val table: CarbonTable = createTable(absoluteTableIdentifier,
useLocalDict)
+ val table: CarbonTable = createTable(absoluteTableIdentifier,
+ getCarbonTableSchema(absoluteTableIdentifier),
+ useLocalDict)
val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
val loadModel: CarbonLoadModel = new CarbonLoadModel()
import scala.collection.JavaConverters._
@@ -121,8 +119,9 @@ object CarbonDataStoreCreator {
"," +
"true")
loadModel.setMaxColumns("15")
-
loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary,bonus,"
+
- "monthlyBonus,dob,shortField,isCurrentEmployee")
+ loadModel.setCsvHeader(
+
"ID,date,country,name,phonetype,serialname,salary,bonus,monthlyBonus,dob,shortField,"
+
+ "isCurrentEmployee")
loadModel.setCsvHeaderColumns(loadModel.getCsvHeader.split(","))
loadModel.setTaskNo("0")
loadModel.setSegmentId("0")
@@ -135,29 +134,8 @@ object CarbonDataStoreCreator {
}
}
- private def createTable(absoluteTableIdentifier: AbsoluteTableIdentifier,
+ def createTable(absoluteTableIdentifier: AbsoluteTableIdentifier, schema:
TableSchema,
useLocalDict: Boolean): CarbonTable = {
-
- val integer = new AtomicInteger(0)
- val schemaBuilder = new TableSchemaBuilder
- schemaBuilder.addColumn(new StructField("ID", DataTypes.INT), integer,
false, false)
- schemaBuilder.addColumn(new StructField("date", DataTypes.DATE), integer,
false, false)
- schemaBuilder.addColumn(new StructField("country", DataTypes.STRING),
integer, false, false)
- schemaBuilder.addColumn(new StructField("name", DataTypes.STRING),
integer, false, false)
- schemaBuilder.addColumn(new StructField("phonetype", DataTypes.STRING),
integer, false, false)
- schemaBuilder.addColumn(new StructField("serialname", DataTypes.STRING),
integer, false, false)
- schemaBuilder.addColumn(new StructField("salary", DataTypes.DOUBLE),
integer, false, false)
- schemaBuilder.addColumn(new StructField("bonus",
DataTypes.createDecimalType(10, 4)),
- integer, false, true)
- schemaBuilder.addColumn(new StructField("monthlyBonus",
DataTypes.createDecimalType(18, 4)),
- integer, false, true)
- schemaBuilder.addColumn(new StructField("dob", DataTypes.TIMESTAMP),
integer, false, true)
- schemaBuilder.addColumn(new StructField("shortField", DataTypes.SHORT),
integer, false, false)
- schemaBuilder.addColumn(new StructField("isCurrentEmployee",
DataTypes.BOOLEAN),
- integer, false, true)
- schemaBuilder.tableName(absoluteTableIdentifier.getTableName)
- val schema = schemaBuilder.build()
-
val builder = new CarbonTableBuilder
builder.databaseName(absoluteTableIdentifier.getDatabaseName)
.tableName(absoluteTableIdentifier.getTableName)
@@ -174,12 +152,12 @@ object CarbonDataStoreCreator {
CarbonMetadata.getInstance.loadTableMetadata(tableInfo)
val schemaConverter: SchemaConverter =
new ThriftWrapperSchemaConverterImpl()
- val thriftTableInfo: org.apache.carbondata.format.TableInfo =
+ val thriftTableInfo: TableInfo =
schemaConverter.fromWrapperToExternalTableInfo(
tableInfo,
tableInfo.getDatabaseName,
tableInfo.getFactTable.getTableName)
- val schemaEvolutionEntry:
org.apache.carbondata.format.SchemaEvolutionEntry =
+ val schemaEvolutionEntry: SchemaEvolutionEntry =
new org.apache.carbondata.format.SchemaEvolutionEntry(
tableInfo.getLastUpdatedTime)
thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
@@ -194,32 +172,105 @@ object CarbonDataStoreCreator {
CarbonMetadata.getInstance.getCarbonTable(tableInfo.getTableUniqueName)
}
- private def addDictionaryValuesToDimensionSet(dims:
util.List[CarbonDimension],
- dimensionIndex: mutable.Buffer[Int],
- dimensionSet: Array[util.List[String]],
- data: Array[String],
- index: Int) = {
- if (isDictionaryDefaultMember(dims, dimensionSet, index)) {
- dimensionSet(index).add(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
- dimensionSet(index).add(data(dimensionIndex(index)))
- }
- else {
- if (data.length == 1) {
- dimensionSet(index).add("""\N""")
- } else {
- dimensionSet(index).add(data(dimensionIndex(index)))
- }
- }
+ def getCarbonTableSchema(absoluteTableIdentifier: AbsoluteTableIdentifier):
TableSchema = {
+ val integer = new AtomicInteger(0)
+ val schemaBuilder = new TableSchemaBuilder
+ schemaBuilder.addColumn(new StructField("ID", DataTypes.INT), integer,
false, false)
+ schemaBuilder.addColumn(new StructField("date", DataTypes.DATE), integer,
false, false)
+ schemaBuilder.addColumn(new StructField("country", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("name", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("phonetype", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("serialname", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("salary", DataTypes.DOUBLE),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("bonus",
DataTypes.createDecimalType(10, 4)),
+ integer,
+ false,
+ true)
+ schemaBuilder.addColumn(new StructField("monthlyBonus",
DataTypes.createDecimalType(18, 4)),
+ integer,
+ false,
+ true)
+ schemaBuilder.addColumn(new StructField("dob", DataTypes.TIMESTAMP),
integer, false, true)
+ schemaBuilder.addColumn(new StructField("shortField", DataTypes.SHORT),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("isCurrentEmployee",
DataTypes.BOOLEAN),
+ integer,
+ false,
+ true)
+ schemaBuilder.tableName(absoluteTableIdentifier.getTableName)
+ val schema = schemaBuilder.build()
+ schema
+ }
+
+ // TODO: need to refactor
+ def getCarbonTableSchemaForDecimal(absoluteTableIdentifier:
AbsoluteTableIdentifier)
+ : TableSchema = {
+ val integer = new AtomicInteger(0)
+ val schemaBuilder = new TableSchemaBuilder
+ schemaBuilder.addColumn(new StructField("ID", DataTypes.INT), integer,
false, false)
+ schemaBuilder.addColumn(new StructField("date", DataTypes.DATE), integer,
false, false)
+ schemaBuilder.addColumn(new StructField("country", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("name", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("phonetype", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("serialname", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("salary",
DataTypes.createDecimalType(6, 1)),
+ integer,
+ false,
+ false)
+ schemaBuilder.addColumn(new StructField("bonus",
DataTypes.createDecimalType(8, 6)),
+ integer,
+ false,
+ true)
+ schemaBuilder.addColumn(new StructField("monthlyBonus",
DataTypes.createDecimalType(5, 3)),
+ integer,
+ false,
+ true)
+ schemaBuilder.addColumn(new StructField("dob", DataTypes.TIMESTAMP),
integer, false, true)
+ schemaBuilder.addColumn(new StructField("shortField", DataTypes.SHORT),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("isCurrentEmployee",
DataTypes.BOOLEAN),
+ integer,
+ false,
+ true)
+ schemaBuilder.tableName(absoluteTableIdentifier.getTableName)
+ val schema = schemaBuilder.build()
+ schema
}
- private def isDictionaryDefaultMember(dims: util.List[CarbonDimension],
- dimensionSet: Array[util.List[String]],
- index: Int) = {
- val dimensions = dims.asScala
- dimensionSet(index).isEmpty &&
dimensions(index).hasEncoding(Encoding.DICTIONARY) &&
- !dimensions(index).hasEncoding(Encoding.DIRECT_DICTIONARY)
+ def getCarbonTableSchemaForAllPrimitive(absoluteTableIdentifier:
AbsoluteTableIdentifier)
+ : TableSchema = {
+ val integer = new AtomicInteger(0)
+ val schemaBuilder = new TableSchemaBuilder
+ schemaBuilder.addColumn(new StructField("ID", DataTypes.INT), integer,
false, false)
+ schemaBuilder.addColumn(new StructField("date", DataTypes.DATE), integer,
false, false)
+ schemaBuilder.addColumn(new StructField("name", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("salary",
DataTypes.createDecimalType(6, 1)),
+ integer,
+ false,
+ false)
+ schemaBuilder.addColumn(new StructField("bonus",
DataTypes.createDecimalType(8, 6)),
+ integer,
+ false,
+ true)
+ schemaBuilder.addColumn(new StructField("charfield", DataTypes.STRING),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("monthlyBonus",
DataTypes.createDecimalType(5, 3)),
+ integer,
+ false,
+ true)
+ schemaBuilder.addColumn(new StructField("dob", DataTypes.TIMESTAMP),
integer, false, true)
+ schemaBuilder.addColumn(new StructField("shortField", DataTypes.SHORT),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("finalsalary", DataTypes.DOUBLE),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("bigintfield", DataTypes.LONG),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("tinyfield", DataTypes.BYTE),
integer, false, false)
+ schemaBuilder.addColumn(new StructField("isCurrentEmployee",
DataTypes.BOOLEAN),
+ integer,
+ false,
+ true)
+ schemaBuilder.tableName(absoluteTableIdentifier.getTableName)
+ val schema = schemaBuilder.build()
+ schema
}
+
+
/**
* Execute graph which will further load data
*
@@ -297,15 +348,11 @@ object CarbonDataStoreCreator {
new DataLoadExecutor()
.execute(loadModel, Array(storeLocation), Array(readerIterator))
writeLoadMetadata(loadModel.getCarbonDataLoadSchema,
- loadModel.getTableName,
- loadModel.getTableName,
new ArrayList[LoadMetadataDetails]())
}
private def writeLoadMetadata(
schema: CarbonDataLoadSchema,
- databaseName: String,
- tableName: String,
listOfLoadFolderDetails: util.List[LoadMetadataDetails]): Unit = {
try {
val loadMetadataDetails: LoadMetadataDetails = new LoadMetadataDetails()