akashrn5 commented on a change in pull request #4034:
URL: https://github.com/apache/carbondata/pull/4034#discussion_r553751659
##########
File path: integration/presto/pom.xml
##########
@@ -294,7 +294,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
- <version>0.144</version>
+ <version>0.193</version>
Review comment:
just a suggestion, can you put this version in a variable in POM, both
airlift and jackson, so that in future while changing version it will be easier
and chances of missing will be less.
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
##########
@@ -73,18 +74,27 @@
private MapredCarbonOutputCommitter carbonOutputCommitter;
private JobContextImpl jobContext;
- public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
- HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
DateTimeZone timeZone,
- boolean allowCorruptWritesForTesting, boolean
writesToNonManagedTablesEnabled,
- boolean createsOfNonManagedTablesEnabled, TypeManager typeManager,
+ public CarbonDataMetaData(
+ CatalogName catalogName,
+ SemiTransactionalHiveMetastore metastore,
+ HdfsEnvironment hdfsEnvironment,
+ HivePartitionManager partitionManager,
+ DateTimeZone timeZone,
+ boolean allowCorruptWritesForTesting,
+ boolean writesToNonManagedTablesEnabled,
+ boolean createsOfNonManagedTablesEnabled,
+ boolean translateHiveViews, TypeManager typeManager,
LocationService locationService,
- io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec,
- TypeTranslator typeTranslator, String prestoVersion,
- HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata
accessControlMetadata) {
- super(metastore, hdfsEnvironment, partitionManager, timeZone,
allowCorruptWritesForTesting,
- true, createsOfNonManagedTablesEnabled, typeManager,
- locationService, partitionUpdateCodec, typeTranslator, prestoVersion,
- hiveStatisticsProvider, accessControlMetadata);
+ JsonCodec<PartitionUpdate> partitionUpdateCodec,
+ TypeTranslator typeTranslator,
+ String prestoVersion,
+ HiveStatisticsProvider hiveStatisticsProvider,
+ AccessControlMetadata accessControlMetadata) {
+ super(catalogName, metastore, hdfsEnvironment, partitionManager, timeZone,
+ allowCorruptWritesForTesting, writesToNonManagedTablesEnabled,
Review comment:
`writesToNonManagedTablesEnabled ` make this as `true ` like before as
it was failing for insert scenarios when impletemented write feature. Or is the
default value changed now?
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/InternalCarbonDataConnectorFactory.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.google.inject.TypeLiteral;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.event.client.EventModule;
+import io.airlift.json.JsonModule;
+import io.prestosql.plugin.base.CatalogName;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorAccessControl;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeEventListener;
+import
io.prestosql.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
+import io.prestosql.plugin.base.jmx.ConnectorObjectNameGeneratorModule;
+import io.prestosql.plugin.base.jmx.MBeanServerModule;
+import io.prestosql.plugin.hive.HiveAnalyzeProperties;
+import io.prestosql.plugin.hive.HiveMetadataFactory;
+import io.prestosql.plugin.hive.HiveSchemaProperties;
+import io.prestosql.plugin.hive.HiveSessionProperties;
+import io.prestosql.plugin.hive.HiveTableProperties;
+import io.prestosql.plugin.hive.HiveTransactionManager;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.authentication.HiveAuthenticationModule;
+import io.prestosql.plugin.hive.azure.HiveAzureModule;
+import io.prestosql.plugin.hive.gcs.HiveGcsModule;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.HiveMetastoreModule;
+import io.prestosql.plugin.hive.procedure.HiveProcedureModule;
+import io.prestosql.plugin.hive.rubix.RubixEnabledConfig;
+import io.prestosql.plugin.hive.rubix.RubixInitializer;
+import io.prestosql.plugin.hive.rubix.RubixModule;
+import io.prestosql.plugin.hive.s3.HiveS3Module;
+import io.prestosql.plugin.hive.security.HiveSecurityModule;
+import io.prestosql.plugin.hive.security.SystemTableAwareAccessControl;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.PageIndexerFactory;
+import io.prestosql.spi.PageSorter;
+import io.prestosql.spi.VersionEmbedder;
+import io.prestosql.spi.classloader.ThreadContextClassLoader;
+import io.prestosql.spi.connector.Connector;
+import io.prestosql.spi.connector.ConnectorAccessControl;
+import io.prestosql.spi.connector.ConnectorContext;
+import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
+import io.prestosql.spi.connector.ConnectorPageSinkProvider;
+import io.prestosql.spi.connector.ConnectorPageSourceProvider;
+import io.prestosql.spi.connector.ConnectorSplitManager;
+import io.prestosql.spi.connector.SystemTable;
+import io.prestosql.spi.eventlistener.EventListener;
+import io.prestosql.spi.procedure.Procedure;
+import io.prestosql.spi.type.TypeManager;
+import org.weakref.jmx.guice.MBeanModule;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
+import static io.airlift.configuration.ConditionalModule.installModuleIf;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public final class InternalCarbonDataConnectorFactory {
Review comment:
please add a class level comment
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
##########
@@ -58,37 +60,40 @@
private HdfsEnvironment hdfsEnvironment;
@Inject public CarbondataPageSourceProvider(
+ TypeManager typeManager,
HiveConfig hiveConfig,
HdfsEnvironment hdfsEnvironment,
- Set<HiveRecordCursorProvider> cursorProviders,
Set<HivePageSourceFactory> pageSourceFactories,
- TypeManager typeManager,
+ Set<HiveRecordCursorProvider> cursorProviders,
+ GenericHiveRecordCursorProvider genericCursorProvider,
CarbonTableReader carbonTableReader) {
- super(hiveConfig, hdfsEnvironment, cursorProviders, pageSourceFactories,
typeManager);
+ super(typeManager, hiveConfig, hdfsEnvironment, pageSourceFactories,
cursorProviders,
+ genericCursorProvider);
this.carbonTableReader = requireNonNull(carbonTableReader,
"carbonTableReader is null");
this.hdfsEnvironment = hdfsEnvironment;
}
@Override
- public ConnectorPageSource createPageSource(ConnectorTransactionHandle
transactionHandle,
+ public ConnectorPageSource createPageSource(ConnectorTransactionHandle
transaction,
ConnectorSession session, ConnectorSplit split, ConnectorTableHandle
table,
- List<ColumnHandle> columns) {
- HiveSplit carbonSplit =
- checkType(split, HiveSplit.class, "split is not class HiveSplit");
+ List<ColumnHandle> columns, TupleDomain<ColumnHandle> dynamicFilter) {
+ HiveSplit carbonSplit = checkType(split, HiveSplit.class, "split is not
class HiveSplit");
this.queryId = carbonSplit.getSchema().getProperty("queryId");
if (this.queryId == null) {
// Fall back to hive pagesource.
- return super.createPageSource(transactionHandle, session, split, table,
columns);
+ return super.createPageSource(transaction, session, split, table,
columns, dynamicFilter);
}
+ // TODO: check and use dynamicFilter in CarbondataPageSource
Review comment:
if possible, can you create jira for this and add the jira in TODO?
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
##########
@@ -101,127 +52,109 @@
}
}
- public CarbondataConnectorFactory(String connectorName, ClassLoader
classLoader) {
- super(connectorName, classLoader, Optional.empty());
- this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ public CarbondataConnectorFactory(String name) {
+ this(name, EmptyModule.class);
+ }
+
+ public CarbondataConnectorFactory(String connectorName, Class<? extends
Module> module) {
+ super(connectorName, module);
+ this.module = module;
}
+
@Override
public Connector create(String catalogName, Map<String, String> config,
ConnectorContext context) {
- requireNonNull(config, "config is null");
-
- try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(classLoader)) {
- Bootstrap app = new Bootstrap(
- new EventModule(),
- new MBeanModule(),
- new ConnectorObjectNameGeneratorModule(catalogName),
- new JsonModule(),
- new CarbondataModule(catalogName),
- new HiveS3Module(),
- new HiveGcsModule(),
- new HiveMetastoreModule(Optional.ofNullable(null)),
- new HiveSecurityModule(),
- new HiveAuthenticationModule(),
- new HiveProcedureModule(),
- new MBeanServerModule(),
- binder -> {
- binder.bind(NodeVersion.class).toInstance(
- new
NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
-
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
-
binder.bind(VersionEmbedder.class).toInstance(context.getVersionEmbedder());
-
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
-
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
- binder.bind(PageSorter.class).toInstance(context.getPageSorter());
- binder.bind(HiveCatalogName.class).toInstance(new
HiveCatalogName(catalogName));
- configBinder(binder).bindConfig(CarbonTableConfig.class);
- });
-
- Injector injector = app
- .strictConfig()
- .doNotInitializeLogging()
- .setRequiredConfigurationProperties(config)
- .initialize();
-
- LifeCycleManager lifeCycleManager =
injector.getInstance(LifeCycleManager.class);
- HiveMetadataFactory metadataFactory =
injector.getInstance(HiveMetadataFactory.class);
- HiveTransactionManager transactionManager =
- injector.getInstance(HiveTransactionManager.class);
- ConnectorSplitManager splitManager =
injector.getInstance(ConnectorSplitManager.class);
- ConnectorPageSourceProvider connectorPageSource =
- injector.getInstance(ConnectorPageSourceProvider.class);
- ConnectorPageSinkProvider pageSinkProvider =
- injector.getInstance(ConnectorPageSinkProvider.class);
- ConnectorNodePartitioningProvider connectorDistributionProvider =
- injector.getInstance(ConnectorNodePartitioningProvider.class);
- HiveSessionProperties hiveSessionProperties =
- injector.getInstance(HiveSessionProperties.class);
- HiveTableProperties hiveTableProperties =
injector.getInstance(HiveTableProperties.class);
- HiveAnalyzeProperties hiveAnalyzeProperties =
- injector.getInstance(HiveAnalyzeProperties.class);
- ConnectorAccessControl accessControl =
- new
SystemTableAwareAccessControl(injector.getInstance(ConnectorAccessControl.class));
- Set<Procedure> procedures = injector.getInstance(Key.get(new
TypeLiteral<Set<Procedure>>() {
- }));
-
- return new HiveConnector(lifeCycleManager, metadataFactory,
transactionManager,
- new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
- new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource,
classLoader),
- new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider,
classLoader),
- new
ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider,
classLoader),
- ImmutableSet.of(), procedures,
hiveSessionProperties.getSessionProperties(),
- HiveSchemaProperties.SCHEMA_PROPERTIES,
hiveTableProperties.getTableProperties(),
- hiveAnalyzeProperties.getAnalyzeProperties(), accessControl,
classLoader);
- } catch (Exception e) {
- throwIfUnchecked(e);
+ ClassLoader classLoader = context.duplicatePluginClassLoader();
+ try {
+ Object moduleInstance =
classLoader.loadClass(this.module.getName()).getConstructor().newInstance();
+ Class<?> moduleClass = classLoader.loadClass(Module.class.getName());
+ return (Connector)
classLoader.loadClass(InternalCarbonDataConnectorFactory.class.getName())
+ .getMethod("createConnector", String.class, Map.class,
ConnectorContext.class, moduleClass)
+ .invoke(null, catalogName, config, context, moduleInstance);
+ }
+ catch (InvocationTargetException e) {
+ Throwable targetException = e.getTargetException();
+ throwIfUnchecked(targetException);
+ throw new RuntimeException(targetException);
+ }
+ catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
/**
* Set the Carbon format enum to HiveStorageFormat, its a hack but for time
being it is best
* choice to avoid lot of code change.
+ *
+ * @throws Exception
Review comment:
can revert this, as no specific info
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
##########
@@ -18,37 +18,43 @@
package org.apache.carbondata.presto;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
-import io.prestosql.plugin.hive.ForHive;
+import io.airlift.units.Duration;
+import io.prestosql.plugin.base.CatalogName;
+import io.prestosql.plugin.hive.ForHiveTransactionHeartbeats;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveConfig;
-import io.prestosql.plugin.hive.HiveMetadata;
import io.prestosql.plugin.hive.HiveMetadataFactory;
+import io.prestosql.plugin.hive.HiveMetastoreClosure;
import io.prestosql.plugin.hive.HivePartitionManager;
import io.prestosql.plugin.hive.LocationService;
import io.prestosql.plugin.hive.NodeVersion;
import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TransactionalMetadata;
import io.prestosql.plugin.hive.TypeTranslator;
-import io.prestosql.plugin.hive.metastore.CachingHiveMetastore;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.security.AccessControlMetadataFactory;
import io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider;
import io.prestosql.spi.type.TypeManager;
import org.joda.time.DateTimeZone;
+import static
io.prestosql.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
+
public class CarbonMetadataFactory extends HiveMetadataFactory {
private static final Logger log = Logger.get(HiveMetadataFactory.class);
private final boolean allowCorruptWritesForTesting;
private final boolean skipDeletionForAlter;
private final boolean skipTargetCleanupOnRollback;
- private final boolean writesToNonManagedTablesEnabled = true;
+ private final boolean writesToNonManagedTablesEnabled;
Review comment:
could you please confirm that once after this changes, the insert is
working fine in cluster test?
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
##########
@@ -150,8 +148,11 @@ private ConnectorPageSink
createPageSink(HiveWritableTableHandle handle, Connect
handle.getLocationHandle(),
locationService,
session.getQueryId(),
- new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(),
- memoizeMetastore(metastore,
perTransactionMetastoreCacheMaximumSize)),
+ new HivePageSinkMetadataProvider(
+ handle.getPageSinkMetadata(),
+ new HiveMetastoreClosure(
+ memoizeMetastore(metastore,
perTransactionMetastoreCacheMaximumSize)),
+ new HiveIdentity(session)),
Review comment:
from line 151 to 155, please reformat the code
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
##########
@@ -101,127 +52,109 @@
}
}
- public CarbondataConnectorFactory(String connectorName, ClassLoader
classLoader) {
- super(connectorName, classLoader, Optional.empty());
- this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ public CarbondataConnectorFactory(String name) {
Review comment:
```suggestion
public CarbondataConnectorFactory(String connectorName) {
```
##########
File path: pom.xml
##########
@@ -780,9 +780,9 @@
<activeByDefault>true</activeByDefault>
</activation>
<properties>
- <presto.version>316</presto.version>
+ <presto.version>333</presto.version>
Review comment:
similar to one of the able comment, can we design this a variable and
use everywhere?
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
##########
@@ -145,31 +133,59 @@ public void configure(Binder binder) {
jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class);
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
- newExporter(binder).export(FileFormatDataSourceStats.class)
- .as(generatedNameOf(FileFormatDataSourceStats.class, connectorId));
+
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();
Multibinder<HivePageSourceFactory> pageSourceFactoryBinder =
newSetBinder(binder, HivePageSourceFactory.class);
pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON);
pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON);
+ Multibinder<HiveRecordCursorProvider> recordCursorProviderBinder =
+ newSetBinder(binder, HiveRecordCursorProvider.class);
+
recordCursorProviderBinder.addBinding().to(S3SelectRecordCursorProvider.class)
+ .in(Scopes.SINGLETON);
+
+ binder.bind(GenericHiveRecordCursorProvider.class).in(Scopes.SINGLETON);
+
Multibinder<HiveFileWriterFactory> fileWriterFactoryBinder =
newSetBinder(binder, HiveFileWriterFactory.class);
binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
- newExporter(binder).export(OrcFileWriterFactory.class)
- .as(generatedNameOf(OrcFileWriterFactory.class, connectorId));
- configBinder(binder).bindConfig(OrcFileWriterConfig.class);
+ newExporter(binder).export(OrcFileWriterFactory.class).withGeneratedName();
+ configBinder(binder).bindConfig(OrcReaderConfig.class);
+ configBinder(binder).bindConfig(OrcWriterConfig.class);
fileWriterFactoryBinder.addBinding().to(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
fileWriterFactoryBinder.addBinding().to(RcFileFileWriterFactory.class).in(Scopes.SINGLETON);
+
+ configBinder(binder).bindConfig(ParquetReaderConfig.class);
+ configBinder(binder).bindConfig(ParquetWriterConfig.class);
fileWriterFactoryBinder.addBinding().to(CarbonDataFileWriterFactory.class).in(Scopes.SINGLETON);
binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
- configBinder(binder).bindConfig(ParquetFileWriterConfig.class);
+
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
+
+ newSetBinder(binder, SystemTable.class);
// configure carbon properties
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO, "true");
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO,
"true");
}
+ public static final class TypeDeserializer
Review comment:
do we need this? as its already present in HiveModule class and
accessible right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]