This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1ccf295 [CARBONDATA-4091] support prestosql 333 integartion with
carbon
1ccf295 is described below
commit 1ccf295340611c9870e24112b532fd3cfdd821ad
Author: ajantha-bhat <[email protected]>
AuthorDate: Mon Mar 9 16:21:41 2020 +0530
[CARBONDATA-4091] support prestosql 333 integartion with carbon
Why is this PR needed?
Currently carbondata is integrated with presto-sql 316, which is 1.5 years
older.
There are many good features and optimization that came into presto
like dynamic filtering, Rubix data cache and some performance improvements.
It is always good to use latest version, latest version is presto-sql 348.
But jumping from 316 to 348 will be too many changes.
So, to utilize these new features and based on customer demand, I suggest
to upgrade presto-sql to 333 version. Later it will be again upgraded
to more latest version in few months.
Note:
This is a plain integration to support all existing features of presto316,
deep integration to support new features like dynamic filtering,
Rubix cache will be handled in another PR.
What changes were proposed in this PR?
1. Adapt to the new hive adapter changes like some constructor changes,
Made a carbonDataConnector to support CarbonDataHandleResolver
2. Java 11 removed ConstructorAccessor class, so using unsafe class for
reflection. (presto333 depend on java 11 for runtime)
3. POM changes to support presto333
Note: JAVA 11 environment is needed for running presto333 with carbon and
also
need add this jvm property
"--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED"
This closes #4034
---
docs/prestosql-guide.md | 36 ++--
integration/presto/pom.xml | 26 ++-
.../carbondata/presto/CarbonDataConnector.java | 85 ++++++++
.../carbondata/presto/CarbonDataFileWriter.java | 8 +-
.../presto/CarbonDataFileWriterFactory.java | 4 +-
.../presto/CarbonDataLocationService.java | 2 +-
.../carbondata/presto/CarbonDataMetaData.java | 40 ++--
.../presto/CarbonDataPageSinkProvider.java | 23 ++-
.../carbondata/presto/CarbonDataWriterFactory.java | 6 +-
.../carbondata/presto/CarbonMetadataFactory.java | 133 +++++++++----
.../presto/CarbondataConnectorFactory.java | 221 ++++++---------------
.../apache/carbondata/presto/CarbondataModule.java | 88 ++++----
.../carbondata/presto/CarbondataPageSource.java | 17 +-
.../presto/CarbondataPageSourceProvider.java | 29 +--
.../apache/carbondata/presto/CarbondataPlugin.java | 7 +-
.../carbondata/presto/CarbondataSplitManager.java | 20 +-
.../presto/InternalCarbonDataConnectorFactory.java | 184 +++++++++++++++++
.../carbondata/presto/server/PrestoServer.scala | 24 ++-
pom.xml | 8 +-
19 files changed, 599 insertions(+), 362 deletions(-)
diff --git a/docs/prestosql-guide.md b/docs/prestosql-guide.md
index 3dac6c7..d5b4eed 100644
--- a/docs/prestosql-guide.md
+++ b/docs/prestosql-guide.md
@@ -34,32 +34,32 @@ https://github.com/apache/carbondata/blob/master/pom.xml
and look for ```<presto.version>``` inside `prestosql` profile.
_Example:_
- `<presto.version>316</presto.version>`
-This means current version of carbon supports presto 316 version.
+ `<presto.version>333</presto.version>`
+This means current version of carbon supports presto 333 version.
_Note:_
Currently carbondata supports only one version of presto, cannot handle
multiple versions at same time. If user wish to use older version of presto,
then need to use older version of carbon (other old branches, say branch-1.5
and check the supported presto version in it's pom.xml file in
integration/presto/)
- 1. Download that version of Presto (say 316) using below command:
+ 1. Download that version of Presto (say 333) using below command:
```
- wget
https://repo1.maven.org/maven2/io/prestosql/presto-server/316/presto-server-316.tar.gz
+ wget
https://repo1.maven.org/maven2/io/prestosql/presto-server/333/presto-server-333.tar.gz
```
- 2. Extract Presto tar file: `tar zxvf presto-server-316.tar.gz`.
+ 2. Extract Presto tar file: `tar zxvf presto-server-333.tar.gz`.
- 3. Download the Presto CLI of the same presto server version (say 316) for
the coordinator and name it presto.
+ 3. Download the Presto CLI of the same presto server version (say 333) for
the coordinator and name it presto.
```
- wget
https://repo1.maven.org/maven2/io/prestosql/presto-cli/316/presto-cli-316-executable.jar
+ wget
https://repo1.maven.org/maven2/io/prestosql/presto-cli/333/presto-cli-333-executable.jar
- mv presto-cli-316-executable.jar presto
+ mv presto-cli-333-executable.jar presto
chmod +x presto
```
### Create Configuration Files
- 1. Create `etc` folder in presto-server-316 directory.
+ 1. Create `etc` folder in presto-server-333 directory.
2. Create `config.properties`, `jvm.config`, `log.properties`, and
`node.properties` files.
3. Install uuid to generate a node.id.
@@ -154,12 +154,12 @@ Just replace the connector name in hive configuration and
copy same to carbondat
### Start Presto Server on all nodes
```
-./presto-server-316/bin/launcher start
+./presto-server-333/bin/launcher start
```
To run it as a background process.
```
-./presto-server-316/bin/launcher run
+./presto-server-333/bin/launcher run
```
To run it in foreground.
@@ -182,7 +182,7 @@ Now you can use the Presto CLI on the coordinator to query
data sources in the c
## Presto Single Node Setup for Carbondata
### Config presto server
-* Download presto server (316 is suggested and supported) :
https://repo1.maven.org/maven2/io/prestosql/presto-server/
+* Download presto server (333 is suggested and supported) :
https://repo1.maven.org/maven2/io/prestosql/presto-server/
* Finish presto configuration following
https://prestosql.io/docs/current/installation/deployment.html.
A configuration example:
@@ -240,13 +240,13 @@ Now you can use the Presto CLI on the coordinator to
query data sources in the c
```
$ git clone https://github.com/apache/carbondata
$ cd carbondata
- $ mvn -DskipTests -P{spark-version} -P{prestodb/prestosql}
-Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number}
clean package
+ $ mvn -DskipTests -P{spark-version} -P{prestodb/prestosql} clean package
```
- Replace the spark and hadoop version with the version used in your cluster.
- For example, use prestosql profile and
- if you are using Spark 2.4.5, you would like to compile using:
+ For prestosql-333 and above, please use spark2.3 profile.
+ Because spark2.4 and higher profiles will bring hadoop3 dependencies and it
will cause presto333 server launch failure.
+ So, compile using:
```
- mvn -DskipTests -Pspark-2.4 -Pprestosql -Dspark.version=2.4.5
-Dhadoop.version=2.7.2 clean package
+ mvn -DskipTests -Pspark-2.3 -Pprestosql clean package
```
Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and
@@ -300,7 +300,7 @@ Note: Load Carbon properties are not yet supported. The
insert will work with al
configurations.
### Query carbondata in CLI of presto
-* Download presto cli client of version 316 :
https://repo1.maven.org/maven2/io/prestosql/presto-cli/
+* Download presto cli client of version 333 :
https://repo1.maven.org/maven2/io/prestosql/presto-cli/
* Start CLI:
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 113fbc8..fb92f99 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -34,6 +34,8 @@
<httpcore.version>4.4.9</httpcore.version>
<dev.path>${basedir}/../../dev</dev.path>
<jacoco.append>true</jacoco.append>
+ <jackson.core.version>2.10.0</jackson.core.version>
+ <airlift.bootstrap.version>0.193</airlift.bootstrap.version>
</properties>
<dependencies>
@@ -41,7 +43,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>2.10.0</version>
+ <version>${jackson.core.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
@@ -53,7 +55,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
- <version>2.10.0</version>
+ <version>${jackson.core.version}</version>
<scope>provided</scope>
</dependency>
@@ -249,7 +251,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
- <version>0.144</version>
+ <version>${airlift.bootstrap.version}</version>
<!--<scope>provided</scope>-->
<exclusions>
<exclusion>
@@ -294,8 +296,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
- <version>0.144</version>
- <!--<scope>provided</scope>-->
+ <version>${airlift.bootstrap.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -333,8 +334,8 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
- <version>1.0</version>
- <scope>provided</scope>
+ <version>1.6</version>
+ <scope>${presto.depndency.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -369,6 +370,17 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-multibindings</artifactId>
+ <version>4.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>4.0.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>${presto.hadoop.groupid}</groupId>
<artifactId>${presto.hadoop.artifactid}</artifactId>
<version>${presto.hadoop.version}</version>
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataConnector.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataConnector.java
new file mode 100755
index 0000000..27706a9
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataConnector.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import io.airlift.bootstrap.LifeCycleManager;
+import io.prestosql.plugin.hive.HiveConnector;
+import io.prestosql.plugin.hive.HiveTransactionManager;
+import io.prestosql.plugin.hive.TransactionalMetadataFactory;
+import io.prestosql.spi.connector.ConnectorAccessControl;
+import io.prestosql.spi.connector.ConnectorHandleResolver;
+import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
+import io.prestosql.spi.connector.ConnectorPageSinkProvider;
+import io.prestosql.spi.connector.ConnectorPageSourceProvider;
+import io.prestosql.spi.connector.ConnectorSplitManager;
+import io.prestosql.spi.connector.SystemTable;
+import io.prestosql.spi.eventlistener.EventListener;
+import io.prestosql.spi.procedure.Procedure;
+import io.prestosql.spi.session.PropertyMetadata;
+
+/**
+ * CarbonData connector that extends hive connector for just to set
HandleResolver
+ */
+public class CarbonDataConnector extends HiveConnector {
+
+ public CarbonDataConnector(
+ LifeCycleManager lifeCycleManager,
+ TransactionalMetadataFactory metadataFactory,
+ HiveTransactionManager transactionManager,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider,
+ ConnectorPageSinkProvider pageSinkProvider,
+ ConnectorNodePartitioningProvider nodePartitioningProvider,
+ Set<SystemTable> systemTables,
+ Set<Procedure> procedures,
+ Set<EventListener> eventListeners,
+ List<PropertyMetadata<?>> sessionProperties,
+ List<PropertyMetadata<?>> schemaProperties,
+ List<PropertyMetadata<?>> tableProperties,
+ List<PropertyMetadata<?>> analyzeProperties,
+ ConnectorAccessControl accessControl,
+ ClassLoader classLoader) {
+ super(
+ lifeCycleManager,
+ metadataFactory,
+ transactionManager,
+ splitManager,
+ pageSourceProvider,
+ pageSinkProvider,
+ nodePartitioningProvider,
+ systemTables,
+ procedures,
+ eventListeners,
+ sessionProperties,
+ schemaProperties,
+ tableProperties,
+ analyzeProperties,
+ accessControl,
+ classLoader);
+ }
+
+ @Override
+ public Optional<ConnectorHandleResolver> getHandleResolver() {
+ // use the CarbonDataHandleResolver to support insert as carbonData
operation
+ return Optional.of(new CarbonDataHandleResolver());
+ }
+}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
index ee7602e..27bc65e 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
@@ -31,9 +31,9 @@ import org.apache.carbondata.hive.MapredCarbonOutputFormat;
import org.apache.carbondata.presto.impl.CarbonTableConfig;
import com.google.common.collect.ImmutableList;
-import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.plugin.hive.HiveType;
-import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.plugin.hive.util.HiveWriteUtils;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
@@ -43,9 +43,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
@@ -67,7 +65,7 @@ import static
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFacto
* This class implements HiveFileWriter and it creates the carbonFileWriter to
write the page data
* sent from presto.
*/
-public class CarbonDataFileWriter implements HiveFileWriter {
+public class CarbonDataFileWriter implements FileWriter {
private static final Logger LOG =
LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
index fb0b46f..c563d7e 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
@@ -23,8 +23,8 @@ import java.util.Properties;
import com.google.inject.Inject;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
+import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.plugin.hive.HdfsEnvironment;
-import io.prestosql.plugin.hive.HiveFileWriter;
import io.prestosql.plugin.hive.HiveFileWriterFactory;
import io.prestosql.plugin.hive.NodeVersion;
import io.prestosql.plugin.hive.metastore.StorageFormat;
@@ -58,7 +58,7 @@ public class CarbonDataFileWriterFactory implements
HiveFileWriterFactory {
}
@Override
- public Optional<HiveFileWriter> createFileWriter(Path path, List<String>
inputColumnNames,
+ public Optional<FileWriter> createFileWriter(Path path, List<String>
inputColumnNames,
StorageFormat storageFormat, Properties schema, JobConf configuration,
ConnectorSession session) {
try {
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
index 5532a7c..1f3a5fa 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
@@ -20,10 +20,10 @@ package org.apache.carbondata.presto;
import com.google.inject.Inject;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveLocationService;
-import io.prestosql.plugin.hive.HiveWriteUtils;
import io.prestosql.plugin.hive.LocationHandle;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.plugin.hive.util.HiveWriteUtils;
import io.prestosql.spi.connector.ConnectorSession;
import org.apache.hadoop.fs.Path;
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
index 42774f5..fb63d56 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
@@ -23,7 +23,6 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
@@ -32,7 +31,9 @@ import org.apache.carbondata.presto.impl.CarbonTableConfig;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import com.google.common.collect.ImmutableMap;
+import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
+import io.prestosql.plugin.base.CatalogName;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveInsertTableHandle;
import io.prestosql.plugin.hive.HiveMetadata;
@@ -40,6 +41,7 @@ import io.prestosql.plugin.hive.HivePartitionManager;
import io.prestosql.plugin.hive.LocationService;
import io.prestosql.plugin.hive.PartitionUpdate;
import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.MetastoreUtil;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.metastore.Table;
@@ -59,7 +61,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
@@ -73,18 +74,27 @@ public class CarbonDataMetaData extends HiveMetadata {
private MapredCarbonOutputCommitter carbonOutputCommitter;
private JobContextImpl jobContext;
- public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
- HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
DateTimeZone timeZone,
- boolean allowCorruptWritesForTesting, boolean
writesToNonManagedTablesEnabled,
- boolean createsOfNonManagedTablesEnabled, TypeManager typeManager,
+ public CarbonDataMetaData(
+ CatalogName catalogName,
+ SemiTransactionalHiveMetastore metastore,
+ HdfsEnvironment hdfsEnvironment,
+ HivePartitionManager partitionManager,
+ DateTimeZone timeZone,
+ boolean allowCorruptWritesForTesting,
+ boolean writesToNonManagedTablesEnabled,
+ boolean createsOfNonManagedTablesEnabled,
+ boolean translateHiveViews, TypeManager typeManager,
LocationService locationService,
- io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec,
- TypeTranslator typeTranslator, String prestoVersion,
- HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata
accessControlMetadata) {
- super(metastore, hdfsEnvironment, partitionManager, timeZone,
allowCorruptWritesForTesting,
- true, createsOfNonManagedTablesEnabled, typeManager,
- locationService, partitionUpdateCodec, typeTranslator, prestoVersion,
- hiveStatisticsProvider, accessControlMetadata);
+ JsonCodec<PartitionUpdate> partitionUpdateCodec,
+ TypeTranslator typeTranslator,
+ String prestoVersion,
+ HiveStatisticsProvider hiveStatisticsProvider,
+ AccessControlMetadata accessControlMetadata) {
+ super(catalogName, metastore, hdfsEnvironment, partitionManager, timeZone,
+ allowCorruptWritesForTesting, writesToNonManagedTablesEnabled,
+ createsOfNonManagedTablesEnabled, translateHiveViews, typeManager,
locationService,
+ partitionUpdateCodec, typeTranslator, prestoVersion,
hiveStatisticsProvider,
+ accessControlMetadata);
this.hdfsEnvironment = hdfsEnvironment;
this.metastore = metastore;
}
@@ -94,8 +104,8 @@ public class CarbonDataMetaData extends HiveMetadata {
ConnectorTableHandle tableHandle) {
HiveInsertTableHandle hiveInsertTableHandle = super.beginInsert(session,
tableHandle);
SchemaTableName tableName = hiveInsertTableHandle.getSchemaTableName();
- Optional<Table> table =
- this.metastore.getTable(tableName.getSchemaName(),
tableName.getTableName());
+ Optional<Table> table = this.metastore
+ .getTable(new HiveIdentity(session), tableName.getSchemaName(),
tableName.getTableName());
Path outputPath =
new
Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableTargetPath());
JobConf jobConf = ConfigurationUtils.toJobConf(this.hdfsEnvironment
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
index 9747017..897b4ef 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
@@ -33,14 +33,15 @@ import io.airlift.units.DataSize;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.HiveMetastoreClosure;
import io.prestosql.plugin.hive.HivePageSink;
import io.prestosql.plugin.hive.HivePageSinkProvider;
import io.prestosql.plugin.hive.HiveSessionProperties;
import io.prestosql.plugin.hive.HiveWritableTableHandle;
import io.prestosql.plugin.hive.HiveWriterStats;
import io.prestosql.plugin.hive.LocationService;
-import io.prestosql.plugin.hive.OrcFileWriterFactory;
import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider;
import io.prestosql.plugin.hive.metastore.SortingColumn;
@@ -55,7 +56,7 @@ import io.prestosql.spi.type.TypeManager;
import static
com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
-import static
io.prestosql.plugin.hive.metastore.CachingHiveMetastore.memoizeMetastore;
+import static
io.prestosql.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
@@ -78,7 +79,6 @@ public class CarbonDataPageSinkProvider extends
HivePageSinkProvider {
private final EventClient eventClient;
private final HiveSessionProperties hiveSessionProperties;
private final HiveWriterStats hiveWriterStats;
- private final OrcFileWriterFactory orcFileWriterFactory;
private final long perTransactionMetastoreCacheMaximumSize;
@Inject
@@ -87,10 +87,10 @@ public class CarbonDataPageSinkProvider extends
HivePageSinkProvider {
PageIndexerFactory pageIndexerFactory, TypeManager typeManager,
HiveConfig config,
LocationService locationService, JsonCodec<PartitionUpdate>
partitionUpdateCodec,
NodeManager nodeManager, EventClient eventClient, HiveSessionProperties
hiveSessionProperties,
- HiveWriterStats hiveWriterStats, OrcFileWriterFactory
orcFileWriterFactory) {
+ HiveWriterStats hiveWriterStats) {
super(fileWriterFactories, hdfsEnvironment, pageSorter, metastore,
pageIndexerFactory,
typeManager, config, locationService, partitionUpdateCodec,
nodeManager, eventClient,
- hiveSessionProperties, hiveWriterStats, orcFileWriterFactory);
+ hiveSessionProperties, hiveWriterStats);
this.fileWriterFactories =
ImmutableSet.copyOf(requireNonNull(fileWriterFactories,
"fileWriterFactories is null"));
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is
null");
@@ -114,8 +114,6 @@ public class CarbonDataPageSinkProvider extends
HivePageSinkProvider {
this.hiveSessionProperties =
requireNonNull(hiveSessionProperties, "hiveSessionProperties is null");
this.hiveWriterStats = requireNonNull(hiveWriterStats, "stats is null");
- this.orcFileWriterFactory =
- requireNonNull(orcFileWriterFactory, "orcFileWriterFactory is null");
this.perTransactionMetastoreCacheMaximumSize =
config.getPerTransactionMetastoreCacheMaximumSize();
}
@@ -150,8 +148,13 @@ public class CarbonDataPageSinkProvider extends
HivePageSinkProvider {
handle.getLocationHandle(),
locationService,
session.getQueryId(),
- new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(),
- memoizeMetastore(metastore,
perTransactionMetastoreCacheMaximumSize)),
+ new HivePageSinkMetadataProvider(
+ handle.getPageSinkMetadata(),
+ new HiveMetastoreClosure(
+ memoizeMetastore(
+ metastore,
+ perTransactionMetastoreCacheMaximumSize)),
+ new HiveIdentity(session)),
typeManager,
hdfsEnvironment,
pageSorter,
@@ -163,7 +166,6 @@ public class CarbonDataPageSinkProvider extends
HivePageSinkProvider {
eventClient,
hiveSessionProperties,
hiveWriterStats,
- orcFileWriterFactory,
additionalConf
);
@@ -172,7 +174,6 @@ public class CarbonDataPageSinkProvider extends
HivePageSinkProvider {
handle.getInputColumns(),
handle.getBucketProperty(),
pageIndexerFactory,
- typeManager,
hdfsEnvironment,
maxOpenPartitions,
writeVerificationExecutor,
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataWriterFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataWriterFactory.java
index 57e3dbf..b5a6fd3 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataWriterFactory.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataWriterFactory.java
@@ -34,7 +34,6 @@ import io.prestosql.plugin.hive.HiveWriterFactory;
import io.prestosql.plugin.hive.HiveWriterStats;
import io.prestosql.plugin.hive.LocationHandle;
import io.prestosql.plugin.hive.LocationService;
-import io.prestosql.plugin.hive.OrcFileWriterFactory;
import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider;
import io.prestosql.plugin.hive.metastore.SortingColumn;
import io.prestosql.spi.NodeManager;
@@ -60,13 +59,12 @@ public class CarbonDataWriterFactory extends
HiveWriterFactory {
DataSize sortBufferSize, int maxOpenSortFiles, boolean
immutablePartitions,
ConnectorSession session, NodeManager nodeManager, EventClient
eventClient,
HiveSessionProperties hiveSessionProperties, HiveWriterStats
hiveWriterStats,
- OrcFileWriterFactory orcFileWriterFactory, Map<String, String>
additionalJobConf) {
+ Map<String, String> additionalJobConf) {
super(fileWriterFactories, schemaName, tableName, isCreateTable,
inputColumns,
tableStorageFormat, partitionStorageFormat, additionalTableParameters,
bucketCount,
sortedBy, locationHandle, locationService, queryId,
pageSinkMetadataProvider, typeManager,
hdfsEnvironment, pageSorter, sortBufferSize, maxOpenSortFiles,
immutablePartitions, session,
- nodeManager, eventClient, hiveSessionProperties, hiveWriterStats,
orcFileWriterFactory);
-
+ nodeManager, eventClient, hiveSessionProperties, hiveWriterStats);
this.additionalJobConf = requireNonNull(additionalJobConf, "Additional
jobConf is null");
}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
index 8ab5138..dd0ca4f 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
@@ -18,23 +18,27 @@
package org.apache.carbondata.presto;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
-import io.prestosql.plugin.hive.ForHive;
+import io.airlift.units.Duration;
+import io.prestosql.plugin.base.CatalogName;
+import io.prestosql.plugin.hive.ForHiveTransactionHeartbeats;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveConfig;
-import io.prestosql.plugin.hive.HiveMetadata;
import io.prestosql.plugin.hive.HiveMetadataFactory;
+import io.prestosql.plugin.hive.HiveMetastoreClosure;
import io.prestosql.plugin.hive.HivePartitionManager;
import io.prestosql.plugin.hive.LocationService;
import io.prestosql.plugin.hive.NodeVersion;
import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TransactionalMetadata;
import io.prestosql.plugin.hive.TypeTranslator;
-import io.prestosql.plugin.hive.metastore.CachingHiveMetastore;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.security.AccessControlMetadataFactory;
@@ -42,13 +46,15 @@ import
io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider;
import io.prestosql.spi.type.TypeManager;
import org.joda.time.DateTimeZone;
+import static
io.prestosql.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
+
public class CarbonMetadataFactory extends HiveMetadataFactory {
private static final Logger log = Logger.get(HiveMetadataFactory.class);
private final boolean allowCorruptWritesForTesting;
private final boolean skipDeletionForAlter;
private final boolean skipTargetCleanupOnRollback;
- private final boolean writesToNonManagedTablesEnabled = true;
+ private final boolean writesToNonManagedTablesEnabled;
private final boolean createsOfNonManagedTablesEnabled;
private final long perTransactionCacheMaximumSize;
private final HiveMetastore metastore;
@@ -62,37 +68,55 @@ public class CarbonMetadataFactory extends
HiveMetadataFactory {
private final String prestoVersion;
private final AccessControlMetadataFactory accessControlMetadataFactory;
private final JsonCodec partitionUpdateCodec;
+ private final CatalogName catalogName;
+ private final boolean translateHiveViews;
+ private final BoundedExecutor dropExecutor;
+ private final Optional<Duration> hiveTransactionHeartbeatInterval;
+ private final ScheduledExecutorService heartbeatService;
- @Inject public CarbonMetadataFactory(HiveConfig hiveConfig, HiveMetastore
metastore,
- HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
- @ForHive ExecutorService executorService, TypeManager typeManager,
- LocationService locationService, JsonCodec<PartitionUpdate>
partitionUpdateCodec,
- TypeTranslator typeTranslator, NodeVersion nodeVersion,
+ @Inject public CarbonMetadataFactory(
+ CatalogName catalogName,
+ HiveConfig hiveConfig,
+ HiveMetastore metastore,
+ HdfsEnvironment hdfsEnvironment,
+ HivePartitionManager partitionManager,
+ ExecutorService executorService,
+ @ForHiveTransactionHeartbeats ScheduledExecutorService heartbeatService,
+ TypeManager typeManager,
+ LocationService locationService,
+ JsonCodec<PartitionUpdate> partitionUpdateCodec,
+ TypeTranslator typeTranslator,
+ NodeVersion nodeVersion,
AccessControlMetadataFactory accessControlMetadataFactory) {
- this(metastore, hdfsEnvironment, partitionManager,
hiveConfig.getDateTimeZone(),
- hiveConfig.getMaxConcurrentFileRenames(),
hiveConfig.getAllowCorruptWritesForTesting(),
- hiveConfig.isSkipDeletionForAlter(),
hiveConfig.isSkipTargetCleanupOnRollback(),
- hiveConfig.getWritesToNonManagedTablesEnabled(),
- hiveConfig.getCreatesOfNonManagedTablesEnabled(),
- hiveConfig.getPerTransactionMetastoreCacheMaximumSize(), typeManager,
locationService,
- partitionUpdateCodec, executorService, typeTranslator,
nodeVersion.toString(),
- accessControlMetadataFactory);
+ this(catalogName, metastore, hdfsEnvironment, partitionManager,
hiveConfig.getDateTimeZone(),
+ hiveConfig.getMaxConcurrentFileRenames(),
hiveConfig.getMaxConcurrentMetastoreDrops(),
+ hiveConfig.getAllowCorruptWritesForTesting(),
hiveConfig.isSkipDeletionForAlter(),
+ hiveConfig.isSkipTargetCleanupOnRollback(),
hiveConfig.getWritesToNonManagedTablesEnabled(),
+ hiveConfig.getCreatesOfNonManagedTablesEnabled(),
hiveConfig.isTranslateHiveViews(),
+ hiveConfig.getPerTransactionMetastoreCacheMaximumSize(),
+ hiveConfig.getHiveTransactionHeartbeatInterval(), typeManager,
locationService,
+ partitionUpdateCodec, executorService, heartbeatService,
typeTranslator,
+ nodeVersion.toString(), accessControlMetadataFactory);
}
- public CarbonMetadataFactory(HiveMetastore metastore, HdfsEnvironment
hdfsEnvironment,
- HivePartitionManager partitionManager, DateTimeZone timeZone, int
maxConcurrentFileRenames,
+ public CarbonMetadataFactory(CatalogName catalogName, HiveMetastore
metastore,
+ HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
DateTimeZone timeZone,
+ int maxConcurrentFileRenames, int maxConcurrentMetastoreDrops,
boolean allowCorruptWritesForTesting, boolean skipDeletionForAlter,
boolean skipTargetCleanupOnRollback, boolean
writesToNonManagedTablesEnabled,
- boolean createsOfNonManagedTablesEnabled, long
perTransactionCacheMaximumSize,
+ boolean createsOfNonManagedTablesEnabled, boolean translateHiveViews,
+ long perTransactionCacheMaximumSize, Optional<Duration>
hiveTransactionHeartbeatInterval,
TypeManager typeManager, LocationService locationService,
- io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec,
- ExecutorService executorService, TypeTranslator typeTranslator, String
prestoVersion,
- AccessControlMetadataFactory accessControlMetadataFactory) {
- super(metastore, hdfsEnvironment, partitionManager, timeZone,
maxConcurrentFileRenames,
- allowCorruptWritesForTesting, skipDeletionForAlter,
skipTargetCleanupOnRollback,
- true, createsOfNonManagedTablesEnabled,
- perTransactionCacheMaximumSize, typeManager, locationService,
partitionUpdateCodec,
- executorService, typeTranslator, prestoVersion,
accessControlMetadataFactory);
+ JsonCodec<PartitionUpdate> partitionUpdateCodec, ExecutorService
executorService,
+ ScheduledExecutorService heartbeatService, TypeTranslator typeTranslator,
+ String prestoVersion, AccessControlMetadataFactory
accessControlMetadataFactory) {
+ super(catalogName, metastore, hdfsEnvironment, partitionManager, timeZone,
+ maxConcurrentFileRenames, maxConcurrentMetastoreDrops,
allowCorruptWritesForTesting,
+ skipDeletionForAlter, skipTargetCleanupOnRollback,
writesToNonManagedTablesEnabled,
+ createsOfNonManagedTablesEnabled, translateHiveViews,
perTransactionCacheMaximumSize,
+ hiveTransactionHeartbeatInterval, typeManager, locationService,
partitionUpdateCodec,
+ executorService, heartbeatService, typeTranslator, prestoVersion,
+ accessControlMetadataFactory);
this.allowCorruptWritesForTesting = allowCorruptWritesForTesting;
this.skipDeletionForAlter = skipDeletionForAlter;
this.skipTargetCleanupOnRollback = skipTargetCleanupOnRollback;
@@ -112,23 +136,50 @@ public class CarbonMetadataFactory extends
HiveMetadataFactory {
.requireNonNull(accessControlMetadataFactory,
"accessControlMetadataFactory is null");
if (!allowCorruptWritesForTesting &&
!timeZone.equals(DateTimeZone.getDefault())) {
log.warn(
- "Hive writes are disabled. To write data to Hive, your JVM timezone
must match the Hive storage timezone. Add -Duser.timezone=%s to your JVM
arguments",
+ "Hive writes are disabled. "
+ + "To write data to Hive, your JVM timezone must match the Hive
storage timezone. "
+ + "Add -Duser.timezone=%s to your JVM arguments",
timeZone.getID());
}
-
this.renameExecution = new BoundedExecutor(executorService,
maxConcurrentFileRenames);
+ this.dropExecutor = new BoundedExecutor(executorService,
maxConcurrentMetastoreDrops);
+ this.catalogName = catalogName;
+ this.translateHiveViews = translateHiveViews;
+ this.hiveTransactionHeartbeatInterval = hiveTransactionHeartbeatInterval;
+ this.heartbeatService = heartbeatService;
+ this.writesToNonManagedTablesEnabled = writesToNonManagedTablesEnabled;
}
- @Override public HiveMetadata get() {
- SemiTransactionalHiveMetastore metastore =
- new SemiTransactionalHiveMetastore(this.hdfsEnvironment,
CachingHiveMetastore
- .memoizeMetastore(this.metastore,
this.perTransactionCacheMaximumSize),
- this.renameExecution, this.skipDeletionForAlter,
this.skipTargetCleanupOnRollback);
- return new CarbonDataMetaData(metastore, this.hdfsEnvironment,
this.partitionManager,
- this.timeZone, this.allowCorruptWritesForTesting,
this.writesToNonManagedTablesEnabled,
- this.createsOfNonManagedTablesEnabled, this.typeManager,
this.locationService,
- this.partitionUpdateCodec, this.typeTranslator, this.prestoVersion,
- new MetastoreHiveStatisticsProvider(metastore),
- this.accessControlMetadataFactory.create(metastore));
+ @Override
+ public TransactionalMetadata create()
+ {
+ SemiTransactionalHiveMetastore metaStore = new
SemiTransactionalHiveMetastore(
+ hdfsEnvironment,
+ new HiveMetastoreClosure(memoizeMetastore(this.metastore,
perTransactionCacheMaximumSize)),
+ renameExecution,
+ dropExecutor,
+ skipDeletionForAlter,
+ skipTargetCleanupOnRollback,
+ hiveTransactionHeartbeatInterval,
+ heartbeatService);
+
+ return new CarbonDataMetaData(
+ catalogName,
+ metaStore,
+ hdfsEnvironment,
+ partitionManager,
+ timeZone,
+ allowCorruptWritesForTesting,
+ writesToNonManagedTablesEnabled,
+ createsOfNonManagedTablesEnabled,
+ translateHiveViews,
+ typeManager,
+ locationService,
+ partitionUpdateCodec,
+ typeTranslator,
+ prestoVersion,
+ new MetastoreHiveStatisticsProvider(metaStore),
+ accessControlMetadataFactory.create(metaStore));
}
+
}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index ce301cd..569d79d 100755
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -19,71 +19,20 @@ package org.apache.carbondata.presto;
import java.lang.reflect.*;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import static java.util.Objects.requireNonNull;
-
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hive.CarbonHiveSerDe;
import org.apache.carbondata.hive.MapredCarbonInputFormat;
import org.apache.carbondata.hive.MapredCarbonOutputFormat;
-import org.apache.carbondata.presto.impl.CarbonTableConfig;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.TypeLiteral;
-import io.airlift.bootstrap.Bootstrap;
-import io.airlift.bootstrap.LifeCycleManager;
-import io.airlift.event.client.EventModule;
-import io.airlift.json.JsonModule;
+
+import com.google.inject.Module;
import io.airlift.units.DataSize;
-import io.prestosql.plugin.base.jmx.MBeanServerModule;
-import io.prestosql.plugin.hive.ConnectorObjectNameGeneratorModule;
-import io.prestosql.plugin.hive.HiveAnalyzeProperties;
-import io.prestosql.plugin.hive.HiveCatalogName;
-import io.prestosql.plugin.hive.HiveConnector;
import io.prestosql.plugin.hive.HiveConnectorFactory;
-import io.prestosql.plugin.hive.HiveMetadataFactory;
-import io.prestosql.plugin.hive.HiveProcedureModule;
-import io.prestosql.plugin.hive.HiveSchemaProperties;
-import io.prestosql.plugin.hive.HiveSessionProperties;
import io.prestosql.plugin.hive.HiveStorageFormat;
-import io.prestosql.plugin.hive.HiveTableProperties;
-import io.prestosql.plugin.hive.HiveTransactionManager;
-import io.prestosql.plugin.hive.NodeVersion;
-import io.prestosql.plugin.hive.authentication.HiveAuthenticationModule;
-import io.prestosql.plugin.hive.gcs.HiveGcsModule;
-import io.prestosql.plugin.hive.metastore.HiveMetastoreModule;
-import io.prestosql.plugin.hive.s3.HiveS3Module;
-import io.prestosql.plugin.hive.security.HiveSecurityModule;
-import io.prestosql.plugin.hive.security.SystemTableAwareAccessControl;
-import io.prestosql.spi.NodeManager;
-import io.prestosql.spi.PageIndexerFactory;
-import io.prestosql.spi.PageSorter;
-import io.prestosql.spi.VersionEmbedder;
-import io.prestosql.spi.classloader.ThreadContextClassLoader;
import io.prestosql.spi.connector.Connector;
-import io.prestosql.spi.connector.ConnectorAccessControl;
import io.prestosql.spi.connector.ConnectorContext;
-import io.prestosql.spi.connector.ConnectorHandleResolver;
-import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
-import io.prestosql.spi.connector.ConnectorPageSinkProvider;
-import io.prestosql.spi.connector.ConnectorPageSourceProvider;
-import io.prestosql.spi.connector.ConnectorSplitManager;
-import
io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
-import
io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
-import
io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import
io.prestosql.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
-import io.prestosql.spi.procedure.Procedure;
-import io.prestosql.spi.type.TypeManager;
-import org.weakref.jmx.guice.MBeanModule;
-import sun.reflect.ConstructorAccessor;
-
-import static com.google.common.base.Throwables.throwIfUnchecked;
-import static io.airlift.configuration.ConfigBinder.configBinder;
+import sun.misc.Unsafe;
+
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
/**
* Build Carbondata Connector
@@ -91,8 +40,6 @@ import static
io.airlift.configuration.ConfigBinder.configBinder;
*/
public class CarbondataConnectorFactory extends HiveConnectorFactory {
- private final ClassLoader classLoader;
-
static {
try {
setCarbonEnum();
@@ -101,127 +48,89 @@ public class CarbondataConnectorFactory extends
HiveConnectorFactory {
}
}
- public CarbondataConnectorFactory(String connectorName, ClassLoader
classLoader) {
- super(connectorName, classLoader, Optional.empty());
- this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ public CarbondataConnectorFactory(String connectorName) {
+ this(connectorName, EmptyModule.class);
}
+ public CarbondataConnectorFactory(String connectorName, Class<? extends
Module> module) {
+ super(connectorName, module);
+ }
+
+
@Override
- public Connector create(String catalogName, Map<String, String> config,
+ public Connector create(
+ String catalogName,
+ Map<String, String> config,
ConnectorContext context) {
- requireNonNull(config, "config is null");
-
- try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(classLoader)) {
- Bootstrap app = new Bootstrap(
- new EventModule(),
- new MBeanModule(),
- new ConnectorObjectNameGeneratorModule(catalogName),
- new JsonModule(),
- new CarbondataModule(catalogName),
- new HiveS3Module(),
- new HiveGcsModule(),
- new HiveMetastoreModule(Optional.ofNullable(null)),
- new HiveSecurityModule(),
- new HiveAuthenticationModule(),
- new HiveProcedureModule(),
- new MBeanServerModule(),
- binder -> {
- binder.bind(NodeVersion.class).toInstance(
- new
NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
-
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
-
binder.bind(VersionEmbedder.class).toInstance(context.getVersionEmbedder());
-
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
-
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
- binder.bind(PageSorter.class).toInstance(context.getPageSorter());
- binder.bind(HiveCatalogName.class).toInstance(new
HiveCatalogName(catalogName));
- configBinder(binder).bindConfig(CarbonTableConfig.class);
- });
-
- Injector injector = app
- .strictConfig()
- .doNotInitializeLogging()
- .setRequiredConfigurationProperties(config)
- .initialize();
-
- LifeCycleManager lifeCycleManager =
injector.getInstance(LifeCycleManager.class);
- HiveMetadataFactory metadataFactory =
injector.getInstance(HiveMetadataFactory.class);
- HiveTransactionManager transactionManager =
- injector.getInstance(HiveTransactionManager.class);
- ConnectorSplitManager splitManager =
injector.getInstance(ConnectorSplitManager.class);
- ConnectorPageSourceProvider connectorPageSource =
- injector.getInstance(ConnectorPageSourceProvider.class);
- ConnectorPageSinkProvider pageSinkProvider =
- injector.getInstance(ConnectorPageSinkProvider.class);
- ConnectorNodePartitioningProvider connectorDistributionProvider =
- injector.getInstance(ConnectorNodePartitioningProvider.class);
- HiveSessionProperties hiveSessionProperties =
- injector.getInstance(HiveSessionProperties.class);
- HiveTableProperties hiveTableProperties =
injector.getInstance(HiveTableProperties.class);
- HiveAnalyzeProperties hiveAnalyzeProperties =
- injector.getInstance(HiveAnalyzeProperties.class);
- ConnectorAccessControl accessControl =
- new
SystemTableAwareAccessControl(injector.getInstance(ConnectorAccessControl.class));
- Set<Procedure> procedures = injector.getInstance(Key.get(new
TypeLiteral<Set<Procedure>>() {
- }));
-
- return new HiveConnector(lifeCycleManager, metadataFactory,
transactionManager,
- new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
- new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource,
classLoader),
- new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider,
classLoader),
- new
ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider,
classLoader),
- ImmutableSet.of(), procedures,
hiveSessionProperties.getSessionProperties(),
- HiveSchemaProperties.SCHEMA_PROPERTIES,
hiveTableProperties.getTableProperties(),
- hiveAnalyzeProperties.getAnalyzeProperties(), accessControl,
classLoader);
- } catch (Exception e) {
- throwIfUnchecked(e);
- throw new RuntimeException(e);
- }
+ return InternalCarbonDataConnectorFactory
+ .createConnector(catalogName, config, context, new EmptyModule());
}
/**
* Set the Carbon format enum to HiveStorageFormat, its a hack but for time
being it is best
* choice to avoid lot of code change.
+ *
+ * @throws Exception
*/
private static void setCarbonEnum() throws Exception {
+ for (HiveStorageFormat format : HiveStorageFormat.values()) {
+ if (format.name().equals("CARBON") ||
format.name().equals("ORG.APACHE.CARBONDATA.FORMAT")
+ || format.name().equals("CARBONDATA")) {
+ return;
+ }
+ }
addHiveStorageFormatsForCarbondata("CARBON");
addHiveStorageFormatsForCarbondata("ORG.APACHE.CARBONDATA.FORMAT");
addHiveStorageFormatsForCarbondata("CARBONDATA");
}
- private static void addHiveStorageFormatsForCarbondata(String storedAs)
- throws InstantiationException, InvocationTargetException,
NoSuchFieldException,
- IllegalAccessException, NoSuchMethodException {
- Constructor<?>[] declaredConstructors =
HiveStorageFormat.class.getDeclaredConstructors();
- declaredConstructors[0].setAccessible(true);
- Field constructorAccessorField =
Constructor.class.getDeclaredField("constructorAccessor");
- constructorAccessorField.setAccessible(true);
- ConstructorAccessor ca =
- (ConstructorAccessor)
constructorAccessorField.get(declaredConstructors[0]);
- if (ca == null) {
- Method acquireConstructorAccessorMethod =
- Constructor.class.getDeclaredMethod("acquireConstructorAccessor");
- acquireConstructorAccessorMethod.setAccessible(true);
- ca = (ConstructorAccessor)
acquireConstructorAccessorMethod.invoke(declaredConstructors[0]);
- }
- Object instance = ca.newInstance(new Object[] { storedAs,
HiveStorageFormat.values().length,
- CarbonHiveSerDe.class.getName(),
MapredCarbonInputFormat.class.getName(),
- MapredCarbonOutputFormat.class.getName(), new DataSize(256.0D,
DataSize.Unit.MEGABYTE) });
- Field values = HiveStorageFormat.class.getDeclaredField("$VALUES");
- values.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(values, values.getModifiers() & ~Modifier.FINAL);
+ private static void addHiveStorageFormatsForCarbondata(String storedAs)
throws Exception {
+ Constructor<?> constructor = Unsafe.class.getDeclaredConstructors()[0];
+ constructor.setAccessible(true);
+ Unsafe unsafe = (Unsafe) constructor.newInstance();
+ HiveStorageFormat enumValue =
+ (HiveStorageFormat) unsafe.allocateInstance(HiveStorageFormat.class);
+
+ Field nameField = Enum.class.getDeclaredField("name");
+ makeAccessible(nameField);
+ nameField.set(enumValue, storedAs);
+
+ Field ordinalField = Enum.class.getDeclaredField("ordinal");
+ makeAccessible(ordinalField);
+ ordinalField.setInt(enumValue, HiveStorageFormat.values().length);
+
+ Field serdeField = HiveStorageFormat.class.getDeclaredField("serde");
+ makeAccessible(serdeField);
+ serdeField.set(enumValue, CarbonHiveSerDe.class.getName());
+
+ Field inputFormatField =
HiveStorageFormat.class.getDeclaredField("inputFormat");
+ makeAccessible(inputFormatField);
+ inputFormatField.set(enumValue, MapredCarbonInputFormat.class.getName());
+
+ Field outputFormatField =
HiveStorageFormat.class.getDeclaredField("outputFormat");
+ makeAccessible(outputFormatField);
+ outputFormatField.set(enumValue, MapredCarbonOutputFormat.class.getName());
+ Field estimatedWriterSystemMemoryUsageField =
+
HiveStorageFormat.class.getDeclaredField("estimatedWriterSystemMemoryUsage");
+ makeAccessible(estimatedWriterSystemMemoryUsageField);
+ estimatedWriterSystemMemoryUsageField.set(enumValue, new DataSize((long)
256, MEGABYTE));
+
+ Field values = HiveStorageFormat.class.getDeclaredField("$VALUES");
+ makeAccessible(values);
HiveStorageFormat[] hiveStorageFormats =
new HiveStorageFormat[HiveStorageFormat.values().length + 1];
HiveStorageFormat[] src = (HiveStorageFormat[]) values.get(null);
System.arraycopy(src, 0, hiveStorageFormats, 0, src.length);
- hiveStorageFormats[src.length] = (HiveStorageFormat) instance;
+ hiveStorageFormats[src.length] = enumValue;
values.set(null, hiveStorageFormats);
}
- @Override
- public ConnectorHandleResolver getHandleResolver() {
- return new CarbonDataHandleResolver();
+ private static void makeAccessible(Field field) throws Exception {
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
}
+
}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
index 0b2cfa2..728690c 100755
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
@@ -17,9 +17,7 @@
package org.apache.carbondata.presto;
-import java.util.function.Supplier;
-
-import static java.util.Objects.requireNonNull;
+import static io.airlift.json.JsonBinder.jsonBinder;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonProperties;
@@ -27,60 +25,57 @@ import org.apache.carbondata.presto.impl.CarbonTableReader;
import com.google.inject.Binder;
import com.google.inject.Scopes;
-import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.airlift.event.client.EventClient;
import io.prestosql.plugin.hive.CachingDirectoryLister;
import io.prestosql.plugin.hive.CoercionPolicy;
import io.prestosql.plugin.hive.DirectoryLister;
-import io.prestosql.plugin.hive.DynamicConfigurationProvider;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.GenericHiveRecordCursorProvider;
-import io.prestosql.plugin.hive.HdfsConfiguration;
-import io.prestosql.plugin.hive.HdfsConfigurationInitializer;
-import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveAnalyzeProperties;
import io.prestosql.plugin.hive.HiveCoercionPolicy;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HiveEventClient;
import io.prestosql.plugin.hive.HiveFileWriterFactory;
-import io.prestosql.plugin.hive.HiveHdfsConfiguration;
+import io.prestosql.plugin.hive.HiveHdfsModule;
import io.prestosql.plugin.hive.HiveLocationService;
import io.prestosql.plugin.hive.HiveMetadataFactory;
import io.prestosql.plugin.hive.HiveModule;
import io.prestosql.plugin.hive.HiveNodePartitioningProvider;
-import io.prestosql.plugin.hive.HivePageSinkProvider;
import io.prestosql.plugin.hive.HivePageSourceFactory;
import io.prestosql.plugin.hive.HivePartitionManager;
import io.prestosql.plugin.hive.HiveRecordCursorProvider;
import io.prestosql.plugin.hive.HiveSessionProperties;
-import io.prestosql.plugin.hive.HiveSplitManager;
import io.prestosql.plugin.hive.HiveTableProperties;
import io.prestosql.plugin.hive.HiveTransactionManager;
import io.prestosql.plugin.hive.HiveTypeTranslator;
import io.prestosql.plugin.hive.HiveWriterStats;
import io.prestosql.plugin.hive.LocationService;
-import io.prestosql.plugin.hive.NamenodeStats;
-import io.prestosql.plugin.hive.OrcFileWriterConfig;
-import io.prestosql.plugin.hive.OrcFileWriterFactory;
-import io.prestosql.plugin.hive.ParquetFileWriterConfig;
import io.prestosql.plugin.hive.PartitionUpdate;
import io.prestosql.plugin.hive.RcFileFileWriterFactory;
-import io.prestosql.plugin.hive.TransactionalMetadata;
+import io.prestosql.plugin.hive.TransactionalMetadataFactory;
import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.orc.OrcFileWriterFactory;
import io.prestosql.plugin.hive.orc.OrcPageSourceFactory;
+import io.prestosql.plugin.hive.orc.OrcReaderConfig;
+import io.prestosql.plugin.hive.orc.OrcWriterConfig;
import io.prestosql.plugin.hive.parquet.ParquetPageSourceFactory;
+import io.prestosql.plugin.hive.parquet.ParquetReaderConfig;
+import io.prestosql.plugin.hive.parquet.ParquetWriterConfig;
import io.prestosql.plugin.hive.rcfile.RcFilePageSourceFactory;
+import io.prestosql.plugin.hive.s3select.PrestoS3ClientFactory;
+import io.prestosql.plugin.hive.s3select.S3SelectRecordCursorProvider;
import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
+import io.prestosql.spi.connector.SystemTable;
+import io.prestosql.spi.type.Type;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
-import static org.weakref.jmx.ObjectNames.generatedNameOf;
import static org.weakref.jmx.guice.ExportBinder.newExporter;
/**
@@ -88,21 +83,12 @@ import static
org.weakref.jmx.guice.ExportBinder.newExporter;
*/
public class CarbondataModule extends HiveModule {
- private final String connectorId;
-
- public CarbondataModule(String connectorId) {
- this.connectorId = requireNonNull(connectorId, "connector id is null");
- }
+ @Override public void configure(Binder binder) {
+ binder.install(new HiveHdfsModule());
- @Override
- public void configure(Binder binder) {
binder.bind(TypeTranslator.class).toInstance(new HiveTypeTranslator());
binder.bind(CoercionPolicy.class).to(HiveCoercionPolicy.class).in(Scopes.SINGLETON);
- binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
- newSetBinder(binder, DynamicConfigurationProvider.class);
-
binder.bind(HdfsConfiguration.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON);
- binder.bind(HdfsEnvironment.class).in(Scopes.SINGLETON);
binder.bind(DirectoryLister.class).to(CachingDirectoryLister.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(HiveConfig.class);
@@ -110,18 +96,13 @@ public class CarbondataModule extends HiveModule {
binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON);
- binder.bind(NamenodeStats.class).in(Scopes.SINGLETON);
- newExporter(binder).export(NamenodeStats.class)
- .as(generatedNameOf(NamenodeStats.class, connectorId));
+ binder.bind(PrestoS3ClientFactory.class).in(Scopes.SINGLETON);
- Multibinder<HiveRecordCursorProvider> recordCursorProviderBinder =
- newSetBinder(binder, HiveRecordCursorProvider.class);
-
recordCursorProviderBinder.addBinding().to(GenericHiveRecordCursorProvider.class)
- .in(Scopes.SINGLETON);
+ binder.bind(CachingDirectoryLister.class).in(Scopes.SINGLETON);
+
newExporter(binder).export(CachingDirectoryLister.class).withGeneratedName();
binder.bind(HiveWriterStats.class).in(Scopes.SINGLETON);
- newExporter(binder).export(HiveWriterStats.class)
- .as(generatedNameOf(HiveWriterStats.class, connectorId));
+ newExporter(binder).export(HiveWriterStats.class).withGeneratedName();
newSetBinder(binder,
EventClient.class).addBinding().to(HiveEventClient.class)
.in(Scopes.SINGLETON);
@@ -129,12 +110,12 @@ public class CarbondataModule extends HiveModule {
binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON);
binder.bind(HiveLocationService.class).to(CarbonDataLocationService.class).in(Scopes.SINGLETON);
binder.bind(HiveMetadataFactory.class).to(CarbonMetadataFactory.class).in(Scopes.SINGLETON);
- binder.bind(new TypeLiteral<Supplier<TransactionalMetadata>>() {
- }).to(HiveMetadataFactory.class).in(Scopes.SINGLETON);
+
binder.bind(TransactionalMetadataFactory.class).to(HiveMetadataFactory.class)
+ .in(Scopes.SINGLETON);
binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
newExporter(binder).export(ConnectorSplitManager.class)
- .as(generatedNameOf(HiveSplitManager.class, connectorId));
+ .as(generator ->
generator.generatedNameOf(CarbondataSplitManager.class));
binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
.in(Scopes.SINGLETON);
binder.bind(ConnectorPageSinkProvider.class).to(CarbonDataPageSinkProvider.class)
@@ -145,8 +126,7 @@ public class CarbondataModule extends HiveModule {
jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class);
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
- newExporter(binder).export(FileFormatDataSourceStats.class)
- .as(generatedNameOf(FileFormatDataSourceStats.class, connectorId));
+
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();
Multibinder<HivePageSourceFactory> pageSourceFactoryBinder =
newSetBinder(binder, HivePageSourceFactory.class);
@@ -154,22 +134,32 @@ public class CarbondataModule extends HiveModule {
pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON);
+ Multibinder<HiveRecordCursorProvider> recordCursorProviderBinder =
+ newSetBinder(binder, HiveRecordCursorProvider.class);
+
recordCursorProviderBinder.addBinding().to(S3SelectRecordCursorProvider.class)
+ .in(Scopes.SINGLETON);
+
+ binder.bind(GenericHiveRecordCursorProvider.class).in(Scopes.SINGLETON);
+
Multibinder<HiveFileWriterFactory> fileWriterFactoryBinder =
newSetBinder(binder, HiveFileWriterFactory.class);
binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
- newExporter(binder).export(OrcFileWriterFactory.class)
- .as(generatedNameOf(OrcFileWriterFactory.class, connectorId));
- configBinder(binder).bindConfig(OrcFileWriterConfig.class);
+ newExporter(binder).export(OrcFileWriterFactory.class).withGeneratedName();
+ configBinder(binder).bindConfig(OrcReaderConfig.class);
+ configBinder(binder).bindConfig(OrcWriterConfig.class);
fileWriterFactoryBinder.addBinding().to(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
fileWriterFactoryBinder.addBinding().to(RcFileFileWriterFactory.class).in(Scopes.SINGLETON);
+
+ configBinder(binder).bindConfig(ParquetReaderConfig.class);
+ configBinder(binder).bindConfig(ParquetWriterConfig.class);
fileWriterFactoryBinder.addBinding().to(CarbonDataFileWriterFactory.class).in(Scopes.SINGLETON);
binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
- configBinder(binder).bindConfig(ParquetFileWriterConfig.class);
+
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
+
+ newSetBinder(binder, SystemTable.class);
// configure carbon properties
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO, "true");
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO,
"true");
}
-
}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSource.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSource.java
index 2c647fe..18b384d 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSource.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -464,33 +464,30 @@ class CarbondataPageSource implements ConnectorPageSource
{
/**
* Lazy Block Implementation for the Carbondata
*/
- private final class CarbondataBlockLoader implements
LazyBlockLoader<LazyBlock> {
+ private final class CarbondataBlockLoader implements LazyBlockLoader {
private final int expectedBatchId = batchId;
private final int columnIndex;
- private boolean loaded;
CarbondataBlockLoader(int columnIndex) {
this.columnIndex = columnIndex;
}
@Override
- public final void load(LazyBlock lazyBlock) {
- if (loaded) {
- return;
- }
+ public final Block load() {
checkState(batchId == expectedBatchId);
+ Block block;
try {
vectorReader.getColumnarBatch().column(columnIndex).loadPage();
PrestoVectorBlockBuilder blockBuilder =
(PrestoVectorBlockBuilder)
vectorReader.getColumnarBatch().column(columnIndex);
- blockBuilder.setBatchSize(lazyBlock.getPositionCount());
- Block block = blockBuilder.buildBlock();
+ // set the number of rows for block builder
+ blockBuilder.setBatchSize(vectorReader.getColumnarBatch().numRows());
+ block = blockBuilder.buildBlock();
sizeOfData += block.getSizeInBytes();
- lazyBlock.setBlock(block);
} catch (Exception e) {
throw new CarbonDataLoadingException("Error in Reading Data from
Carbondata ", e);
}
- loaded = true;
+ return block;
}
}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 6abe4c3..ef4fad2 100644
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.presto.impl.CarbonTableReader;
import static org.apache.carbondata.presto.Types.checkType;
import com.google.inject.Inject;
+import io.prestosql.plugin.hive.GenericHiveRecordCursorProvider;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HivePageSourceFactory;
@@ -43,6 +44,7 @@ import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -59,28 +61,30 @@ public class CarbondataPageSourceProvider extends
HivePageSourceProvider {
private HdfsEnvironment hdfsEnvironment;
@Inject public CarbondataPageSourceProvider(
+ TypeManager typeManager,
HiveConfig hiveConfig,
HdfsEnvironment hdfsEnvironment,
- Set<HiveRecordCursorProvider> cursorProviders,
Set<HivePageSourceFactory> pageSourceFactories,
- TypeManager typeManager,
+ Set<HiveRecordCursorProvider> cursorProviders,
+ GenericHiveRecordCursorProvider genericCursorProvider,
CarbonTableReader carbonTableReader) {
- super(hiveConfig, hdfsEnvironment, cursorProviders, pageSourceFactories,
typeManager);
+ super(typeManager, hiveConfig, hdfsEnvironment, pageSourceFactories,
cursorProviders,
+ genericCursorProvider);
this.carbonTableReader = requireNonNull(carbonTableReader,
"carbonTableReader is null");
this.hdfsEnvironment = hdfsEnvironment;
}
@Override
- public ConnectorPageSource createPageSource(ConnectorTransactionHandle
transactionHandle,
+ public ConnectorPageSource createPageSource(ConnectorTransactionHandle
transaction,
ConnectorSession session, ConnectorSplit split, ConnectorTableHandle
table,
- List<ColumnHandle> columns) {
- HiveSplit carbonSplit =
- checkType(split, HiveSplit.class, "split is not class HiveSplit");
+ List<ColumnHandle> columns, TupleDomain<ColumnHandle> dynamicFilter) {
+ HiveSplit carbonSplit = checkType(split, HiveSplit.class, "split is not
class HiveSplit");
this.queryId = carbonSplit.getSchema().getProperty("queryId");
if (this.queryId == null) {
// Fall back to hive pagesource.
- return super.createPageSource(transactionHandle, session, split, table,
columns);
+ return super.createPageSource(transaction, session, split, table,
columns, dynamicFilter);
}
+ // TODO: check and use dynamicFilter in CarbondataPageSource
Configuration configuration = this.hdfsEnvironment.getConfiguration(
new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(),
carbonSplit.getTable()),
new Path(carbonSplit.getSchema().getProperty("tablePath")));
@@ -89,10 +93,11 @@ public class CarbondataPageSourceProvider extends
HivePageSourceProvider {
configuration.set(entry.getKey().toString(),
entry.getValue().toString());
}
CarbonTable carbonTable = getCarbonTable(carbonSplit, configuration);
- boolean isDirectVectorFill = carbonTableReader.config.getPushRowFilter()
== null ||
- carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false");
- return new CarbondataPageSource(
- carbonTable, queryId, carbonSplit, columns, table, configuration,
isDirectVectorFill);
+ boolean isDirectVectorFill =
+ carbonTableReader.config.getPushRowFilter() == null ||
carbonTableReader.config
+ .getPushRowFilter().equalsIgnoreCase("false");
+ return new CarbondataPageSource(carbonTable, queryId, carbonSplit,
columns, table,
+ configuration, isDirectVectorFill);
}
/**
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPlugin.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPlugin.java
index fa4e1b6..6eb2564 100755
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPlugin.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPlugin.java
@@ -17,8 +17,6 @@
package org.apache.carbondata.presto;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-
import com.google.common.collect.ImmutableList;
import io.prestosql.spi.Plugin;
import io.prestosql.spi.connector.ConnectorFactory;
@@ -27,10 +25,7 @@ public class CarbondataPlugin implements Plugin {
@Override
public Iterable<ConnectorFactory> getConnectorFactories() {
- return ImmutableList.of(new CarbondataConnectorFactory("carbondata",
getClassLoader()));
+ return ImmutableList.of(new CarbondataConnectorFactory("carbondata"));
}
- private static ClassLoader getClassLoader() {
- return FileFactory.class.getClassLoader();
- }
}
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
index 0d6812c..85f5e15 100755
---
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.presto;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -48,7 +47,6 @@ import org.apache.carbondata.presto.impl.CarbonTableReader;
import com.google.common.collect.ImmutableList;
import io.prestosql.plugin.hive.CoercionPolicy;
import io.prestosql.plugin.hive.DirectoryLister;
-import io.prestosql.plugin.hive.ForHive;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveConfig;
@@ -59,6 +57,8 @@ import io.prestosql.plugin.hive.HiveSplitManager;
import io.prestosql.plugin.hive.HiveTableHandle;
import io.prestosql.plugin.hive.HiveTransactionHandle;
import io.prestosql.plugin.hive.NamenodeStats;
+import io.prestosql.plugin.hive.TableToPartitionMapping;
+import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.spi.HostAddress;
@@ -95,7 +95,7 @@ public class CarbondataSplitManager extends HiveSplitManager {
NamenodeStats namenodeStats,
HdfsEnvironment hdfsEnvironment,
DirectoryLister directoryLister,
- @ForHive ExecutorService executorService,
+ ExecutorService executorService,
VersionEmbedder versionEmbedder,
CoercionPolicy coercionPolicy,
CarbonTableReader reader) {
@@ -116,9 +116,9 @@ public class CarbondataSplitManager extends
HiveSplitManager {
// get table metadata
SemiTransactionalHiveMetastore metastore =
metastoreProvider.apply((HiveTransactionHandle) transactionHandle);
- Table table =
- metastore.getTable(schemaTableName.getSchemaName(),
schemaTableName.getTableName())
- .orElseThrow(() -> new TableNotFoundException(schemaTableName));
+ Table table = metastore.getTable(new HiveIdentity(session),
schemaTableName.getSchemaName(),
+ schemaTableName.getTableName())
+ .orElseThrow(() -> new TableNotFoundException(schemaTableName));
if
(!table.getStorage().getStorageFormat().getInputFormat().contains("carbon")) {
return super.getSplits(transactionHandle, session, tableHandle,
splitSchedulingStrategy);
}
@@ -176,10 +176,10 @@ public class CarbondataSplitManager extends
HiveSplitManager {
properties.setProperty("queryId", queryId);
properties.setProperty("index", String.valueOf(index));
cSplits.add(new HiveSplit(schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
- schemaTableName.getTableName(),
cache.getCarbonTable().getTablePath(), 0, 0, 0,
- properties, new ArrayList(),
getHostAddresses(split.getLocations()),
- OptionalInt.empty(), false, new HashMap<>(),
- Optional.empty(), false));
+ schemaTableName.getTableName(),
cache.getCarbonTable().getTablePath(), 0, 0, 0, 0,
+ properties, new ArrayList<>(),
getHostAddresses(split.getLocations()),
+ OptionalInt.empty(), false, TableToPartitionMapping.empty(),
Optional.empty(), false,
+ Optional.empty()));
}
statisticRecorder.logStatisticsAsTableDriver();
diff --git
a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/InternalCarbonDataConnectorFactory.java
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/InternalCarbonDataConnectorFactory.java
new file mode 100755
index 0000000..364366c
--- /dev/null
+++
b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/InternalCarbonDataConnectorFactory.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.google.inject.TypeLiteral;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.event.client.EventModule;
+import io.airlift.json.JsonModule;
+import io.prestosql.plugin.base.CatalogName;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorAccessControl;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeEventListener;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
+import io.prestosql.plugin.base.jmx.ConnectorObjectNameGeneratorModule;
+import io.prestosql.plugin.base.jmx.MBeanServerModule;
+import io.prestosql.plugin.hive.HiveAnalyzeProperties;
+import io.prestosql.plugin.hive.HiveMetadataFactory;
+import io.prestosql.plugin.hive.HiveSchemaProperties;
+import io.prestosql.plugin.hive.HiveSessionProperties;
+import io.prestosql.plugin.hive.HiveTableProperties;
+import io.prestosql.plugin.hive.HiveTransactionManager;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.authentication.HiveAuthenticationModule;
+import io.prestosql.plugin.hive.azure.HiveAzureModule;
+import io.prestosql.plugin.hive.gcs.HiveGcsModule;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.HiveMetastoreModule;
+import io.prestosql.plugin.hive.procedure.HiveProcedureModule;
+import io.prestosql.plugin.hive.rubix.RubixEnabledConfig;
+import io.prestosql.plugin.hive.rubix.RubixInitializer;
+import io.prestosql.plugin.hive.rubix.RubixModule;
+import io.prestosql.plugin.hive.s3.HiveS3Module;
+import io.prestosql.plugin.hive.security.HiveSecurityModule;
+import io.prestosql.plugin.hive.security.SystemTableAwareAccessControl;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.PageIndexerFactory;
+import io.prestosql.spi.PageSorter;
+import io.prestosql.spi.VersionEmbedder;
+import io.prestosql.spi.classloader.ThreadContextClassLoader;
+import io.prestosql.spi.connector.Connector;
+import io.prestosql.spi.connector.ConnectorAccessControl;
+import io.prestosql.spi.connector.ConnectorContext;
+import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
+import io.prestosql.spi.connector.ConnectorPageSinkProvider;
+import io.prestosql.spi.connector.ConnectorPageSourceProvider;
+import io.prestosql.spi.connector.ConnectorSplitManager;
+import io.prestosql.spi.connector.SystemTable;
+import io.prestosql.spi.eventlistener.EventListener;
+import io.prestosql.spi.procedure.Procedure;
+import io.prestosql.spi.type.TypeManager;
+import org.weakref.jmx.guice.MBeanModule;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
+import static io.airlift.configuration.ConditionalModule.installModuleIf;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Similar class to InternalHiveConnectorFactory,
+ * to handle the module bindings for carbonData connector.
+ */
+public final class InternalCarbonDataConnectorFactory {
+ private InternalCarbonDataConnectorFactory() {
+ }
+
+ public static Connector createConnector(String catalogName, Map<String,
String> config,
+ ConnectorContext context, Module module) {
+ return createConnector(catalogName, config, context, module,
Optional.empty());
+ }
+
+ public static Connector createConnector(String catalogName, Map<String,
String> config,
+ ConnectorContext context, Module module, Optional<HiveMetastore>
metastore) {
+ requireNonNull(config, "config is null");
+
+ ClassLoader classLoader =
InternalCarbonDataConnectorFactory.class.getClassLoader();
+ try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(classLoader)) {
+ Bootstrap app = new Bootstrap(
+ new EventModule(),
+ new MBeanModule(),
+ new ConnectorObjectNameGeneratorModule(catalogName,
"io.prestosql.plugin.carbondata",
+ "presto.plugin.carbondata"),
+ new JsonModule(),
+ new CarbondataModule(),
+ new HiveS3Module(),
+ new HiveGcsModule(),
+ new HiveAzureModule(),
+ installModuleIf(RubixEnabledConfig.class,
RubixEnabledConfig::isCacheEnabled,
+ new RubixModule()),
+ new HiveMetastoreModule(metastore),
+ new HiveSecurityModule(catalogName),
+ new HiveAuthenticationModule(),
+ new HiveProcedureModule(),
+ new MBeanServerModule(),
+ binder -> {
+ binder.bind(NodeVersion.class)
+ .toInstance(new
NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+
binder.bind(VersionEmbedder.class).toInstance(context.getVersionEmbedder());
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
+ binder.bind(PageSorter.class).toInstance(context.getPageSorter());
+ binder.bind(CatalogName.class).toInstance(new
CatalogName(catalogName));
+ configBinder(binder).bindConfig(CarbonTableConfig.class);
+ },
+ binder -> newSetBinder(binder, EventListener.class), module);
+
+ Injector injector =
+
app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
+ .initialize();
+
+ if (injector.getInstance(RubixEnabledConfig.class).isCacheEnabled()) {
+ // RubixInitializer needs ConfigurationInitializers, hence kept
outside RubixModule
+ RubixInitializer rubixInitializer =
injector.getInstance(RubixInitializer.class);
+ rubixInitializer.initializeRubix(context.getNodeManager());
+ }
+
+ LifeCycleManager lifeCycleManager =
injector.getInstance(LifeCycleManager.class);
+ HiveMetadataFactory metadataFactory =
injector.getInstance(HiveMetadataFactory.class);
+ HiveTransactionManager transactionManager =
+ injector.getInstance(HiveTransactionManager.class);
+ ConnectorSplitManager splitManager =
injector.getInstance(ConnectorSplitManager.class);
+ ConnectorPageSourceProvider connectorPageSource =
+ injector.getInstance(ConnectorPageSourceProvider.class);
+ ConnectorPageSinkProvider pageSinkProvider =
+ injector.getInstance(ConnectorPageSinkProvider.class);
+ ConnectorNodePartitioningProvider connectorDistributionProvider =
+ injector.getInstance(ConnectorNodePartitioningProvider.class);
+ HiveSessionProperties hiveSessionProperties =
+ injector.getInstance(HiveSessionProperties.class);
+ HiveTableProperties hiveTableProperties =
injector.getInstance(HiveTableProperties.class);
+ HiveAnalyzeProperties hiveAnalyzeProperties =
+ injector.getInstance(HiveAnalyzeProperties.class);
+ ConnectorAccessControl accessControl = new
ClassLoaderSafeConnectorAccessControl(
+ new
SystemTableAwareAccessControl(injector.getInstance(ConnectorAccessControl.class)),
+ classLoader);
+ Set<Procedure> procedures = injector.getInstance(Key.get(new
TypeLiteral<Set<Procedure>>() {
+ }));
+ Set<SystemTable> systemTables =
+ injector.getInstance(Key.get(new TypeLiteral<Set<SystemTable>>() {
+ }));
+ Set<EventListener> eventListeners =
+ injector.getInstance(Key.get(new TypeLiteral<Set<EventListener>>() {
+ })).stream().map(listener -> new
ClassLoaderSafeEventListener(listener, classLoader))
+ .collect(toImmutableSet());
+
+ return new CarbonDataConnector(lifeCycleManager, metadataFactory,
transactionManager,
+ new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
+ new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource,
classLoader),
+ new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider,
classLoader),
+ new
ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider,
classLoader),
+ systemTables, procedures, eventListeners,
hiveSessionProperties.getSessionProperties(),
+ HiveSchemaProperties.SCHEMA_PROPERTIES,
hiveTableProperties.getTableProperties(),
+ hiveAnalyzeProperties.getAnalyzeProperties(), accessControl,
classLoader);
+ }
+ }
+}
diff --git
a/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoServer.scala
b/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoServer.scala
index 0f17715..df0b01c 100644
---
a/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoServer.scala
+++
b/integration/presto/src/test/prestosql/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -18,7 +18,7 @@ package org.apache.carbondata.presto.server
import java.sql.{Connection, DriverManager, ResultSet}
import java.util
-import java.util.{Locale, Optional, Properties}
+import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
@@ -29,8 +29,8 @@ import io.prestosql.execution.QueryIdGenerator
import io.prestosql.jdbc.PrestoStatement
import io.prestosql.metadata.SessionPropertyManager
import io.prestosql.spi.`type`.TimeZoneKey.UTC_KEY
-import io.prestosql.spi.security.Identity
-import io.prestosql.tests.DistributedQueryRunner
+import io.prestosql.spi.security.Identity.Builder
+import io.prestosql.testing.DistributedQueryRunner
import org.slf4j.{Logger, LoggerFactory}
import org.apache.carbondata.presto.CarbondataPlugin
@@ -46,8 +46,11 @@ class PrestoServer {
val prestoProperties: util.Map[String, String] =
Map(("http-server.http.port", "8086")).asJava
val carbonProperties: util.Map[String, String] = new util.HashMap[String,
String]()
createSession
- lazy val queryRunner = new DistributedQueryRunner(createSession, 4,
prestoProperties)
- var dbName : String = null
+ val builder: DistributedQueryRunner.Builder =
DistributedQueryRunner.builder(createSession)
+ builder.setExtraProperties(prestoProperties)
+ builder.setNodeCount(4)
+ lazy val queryRunner: DistributedQueryRunner = builder.build()
+ var dbName : String = _
var statement : PrestoStatement = _
@@ -68,8 +71,8 @@ class PrestoServer {
*
* @param dbName the database name, if not a default database
*/
- def startServer(dbName: String, properties: util.Map[String, String] = new
util.HashMap[String, String]()): Unit = {
-
+ def startServer(dbName: String,
+ properties: util.Map[String, String] = new util.HashMap[String,
String]()): Unit = {
this.dbName = dbName
carbonProperties.putAll(properties)
LOGGER.info("======== STARTING PRESTO SERVER ========")
@@ -86,8 +89,7 @@ class PrestoServer {
Try {
queryRunner.installPlugin(new CarbondataPlugin)
val carbonProperties = ImmutableMap.builder[String, String]
- .putAll(this.carbonProperties)
- .put("carbon.unsafe.working.memory.in.mb", "512").build
+ .putAll(this.carbonProperties).build
// CreateCatalog will create a catalog for CarbonData in etc/catalog.
queryRunner.createCatalog(CARBONDATA_CATALOG, CARBONDATA_CONNECTOR,
carbonProperties)
@@ -127,7 +129,7 @@ class PrestoServer {
}
}
- def execute(query: String) = {
+ def execute(query: String): Boolean = {
Try {
LOGGER.info(s"***** executing the query ***** \n $query")
statement.execute(query)
@@ -193,7 +195,7 @@ class PrestoServer {
LOGGER.info("\n Creating The Presto Server Session")
Session.builder(new SessionPropertyManager)
.setQueryId(new QueryIdGenerator().createNextQueryId)
- .setIdentity(new Identity("user", Optional.empty()))
+ .setIdentity(new Builder("user").build())
.setSource(CARBONDATA_SOURCE).setCatalog(CARBONDATA_CATALOG)
.setTimeZoneKey(UTC_KEY).setLocale(Locale.ENGLISH)
.setRemoteUserAddress("address")
diff --git a/pom.xml b/pom.xml
index dec89fb..4ba43b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,12 +140,12 @@
<hdfs.url>local</hdfs.url>
<presto.jdbc.url>localhost:8086</presto.jdbc.url>
<!-- prestosql by default-->
- <presto.version>316</presto.version>
+ <presto.version>333</presto.version>
<presto.groupid>io.prestosql</presto.groupid>
<presto.hadoop.groupid>io.prestosql.hadoop</presto.hadoop.groupid>
<presto.hadoop.artifactid>hadoop-apache</presto.hadoop.artifactid>
<presto.hadoop.version>3.2.0-2</presto.hadoop.version>
- <airlift.version>0.36</airlift.version>
+ <airlift.version>0.38</airlift.version>
<presto.mvn.plugin.groupid>io.prestosql</presto.mvn.plugin.groupid>
<presto.mvn.plugin.version>6</presto.mvn.plugin.version>
<presto.depndency.scope>compile</presto.depndency.scope>
@@ -883,9 +883,9 @@
<activeByDefault>true</activeByDefault>
</activation>
<properties>
- <presto.version>316</presto.version>
+ <presto.version>333</presto.version>
<scala.version>2.11.8</scala.version>
- <airlift.version>0.36</airlift.version>
+ <airlift.version>0.38</airlift.version>
<presto.groupid>io.prestosql</presto.groupid>
<presto.hadoop.groupid>io.prestosql.hadoop</presto.hadoop.groupid>
<presto.hadoop.artifactid>hadoop-apache</presto.hadoop.artifactid>