Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3019#discussion_r244172111
--- Diff:
integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
---
@@ -17,69 +17,177 @@
package org.apache.carbondata.presto;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.*;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import static java.util.Objects.requireNonNull;
-import com.facebook.presto.spi.ConnectorHandleResolver;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.facebook.presto.hive.HiveConnector;
+import com.facebook.presto.hive.HiveConnectorFactory;
+import com.facebook.presto.hive.HiveMetadataFactory;
+import com.facebook.presto.hive.HiveProcedureModule;
+import com.facebook.presto.hive.HiveSchemaProperties;
+import com.facebook.presto.hive.HiveSessionProperties;
+import com.facebook.presto.hive.HiveStorageFormat;
+import com.facebook.presto.hive.HiveTableProperties;
+import com.facebook.presto.hive.HiveTransactionManager;
+import com.facebook.presto.hive.NodeVersion;
+import com.facebook.presto.hive.RebindSafeMBeanServer;
+import com.facebook.presto.hive.authentication.HiveAuthenticationModule;
+import com.facebook.presto.hive.metastore.HiveMetastoreModule;
+import com.facebook.presto.hive.s3.HiveS3Module;
+import com.facebook.presto.hive.security.HiveSecurityModule;
+import com.facebook.presto.hive.security.PartitionsAwareAccessControl;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.PageIndexerFactory;
+import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.facebook.presto.spi.connector.*;
-import
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorAccessControl;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
+import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
import
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
import
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import com.google.common.base.Throwables;
+import
com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
+import com.facebook.presto.spi.procedure.Procedure;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
+import io.airlift.units.DataSize;
+import org.weakref.jmx.guice.MBeanModule;
+import sun.reflect.ConstructorAccessor;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static io.airlift.configuration.ConfigBinder.configBinder;
/**
* Build Carbondata Connector
* It will be called by CarbondataPlugin
*/
-public class CarbondataConnectorFactory implements ConnectorFactory {
+public class CarbondataConnectorFactory extends HiveConnectorFactory {
- private final String name;
private final ClassLoader classLoader;
public CarbondataConnectorFactory(String connectorName, ClassLoader
classLoader) {
- this.name = connectorName;
+ super(connectorName, classLoader, null);
this.classLoader = requireNonNull(classLoader, "classLoader is null");
}
- @Override public String getName() {
- return name;
- }
-
- @Override public ConnectorHandleResolver getHandleResolver() {
- return new CarbondataHandleResolver();
- }
-
- @Override public Connector create(String connectorId, Map<String,
String> config,
+ @Override public Connector create(String catalogName, Map<String,
String> config,
ConnectorContext context) {
requireNonNull(config, "config is null");
try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(classLoader)) {
- Bootstrap app = new Bootstrap(new JsonModule(),
- new CarbondataModule(connectorId, context.getTypeManager()));
+ Bootstrap app = new Bootstrap(new EventModule(), new MBeanModule(),
new JsonModule(),
+ new CarbondataModule(catalogName), new HiveS3Module(catalogName),
+ new HiveMetastoreModule(catalogName, Optional.ofNullable(null)),
new HiveSecurityModule(),
+ new HiveAuthenticationModule(), new HiveProcedureModule(),
binder -> {
+ javax.management.MBeanServer platformMBeanServer =
+ ManagementFactory.getPlatformMBeanServer();
+ binder.bind(javax.management.MBeanServer.class)
+ .toInstance(new RebindSafeMBeanServer(platformMBeanServer));
+ binder.bind(NodeVersion.class)
+ .toInstance(new
NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
+
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
+ binder.bind(PageSorter.class).toInstance(context.getPageSorter());
+ configBinder(binder).bindConfig(CarbonTableConfig.class);
+ });
Injector injector =
app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
.initialize();
+ setCarbonEnum();
+
LifeCycleManager lifeCycleManager =
injector.getInstance(LifeCycleManager.class);
- ConnectorMetadata metadata =
injector.getInstance(CarbondataMetadata.class);
+ HiveMetadataFactory metadataFactory =
injector.getInstance(HiveMetadataFactory.class);
+ HiveTransactionManager transactionManager =
+ injector.getInstance(HiveTransactionManager.class);
ConnectorSplitManager splitManager =
injector.getInstance(ConnectorSplitManager.class);
ConnectorPageSourceProvider connectorPageSource =
injector.getInstance(ConnectorPageSourceProvider.class);
+ ConnectorPageSinkProvider pageSinkProvider =
+ injector.getInstance(ConnectorPageSinkProvider.class);
+ ConnectorNodePartitioningProvider connectorDistributionProvider =
+ injector.getInstance(ConnectorNodePartitioningProvider.class);
+ HiveSessionProperties hiveSessionProperties =
+ injector.getInstance(HiveSessionProperties.class);
+ HiveTableProperties hiveTableProperties =
injector.getInstance(HiveTableProperties.class);
+ ConnectorAccessControl accessControl =
+ new
PartitionsAwareAccessControl(injector.getInstance(ConnectorAccessControl.class));
+ Set<Procedure> procedures = injector.getInstance(Key.get(new
TypeLiteral<Set<Procedure>>() {
+ }));
- return new CarbondataConnector(lifeCycleManager,
- new ClassLoaderSafeConnectorMetadata(metadata, classLoader),
- new ClassLoaderSafeConnectorSplitManager(splitManager,
classLoader), classLoader,
- new
ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader));
+ return new HiveConnector(lifeCycleManager, metadataFactory,
transactionManager,
+ new ClassLoaderSafeConnectorSplitManager(splitManager,
classLoader),
+ new
ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+ new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider,
classLoader),
+ new
ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider,
classLoader),
+ ImmutableSet.of(), procedures,
hiveSessionProperties.getSessionProperties(),
+ HiveSchemaProperties.SCHEMA_PROPERTIES,
hiveTableProperties.getTableProperties(),
+ accessControl, classLoader);
} catch (Exception e) {
- throw Throwables.propagate(e);
+ throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
}
+
+ /**
+ * Set the Carbon format enum to HiveStorageFormat, its a hack but for
time being it is best
+ * choice to avoid lot of code change.
+ *
+ * @throws Exception
+ */
+ private void setCarbonEnum() throws Exception {
+ for (HiveStorageFormat format : HiveStorageFormat.values()) {
+ if (format.name().equals("CARBON")) {
+ return;
+ }
+ }
+ Constructor<?>[] declaredConstructors =
HiveStorageFormat.class.getDeclaredConstructors();
+ declaredConstructors[0].setAccessible(true);
+ Field constructorAccessorField =
Constructor.class.getDeclaredField("constructorAccessor");
+ constructorAccessorField.setAccessible(true);
+ ConstructorAccessor ca =
+ (ConstructorAccessor)
constructorAccessorField.get(declaredConstructors[0]);
+ if (ca == null) {
+ Method acquireConstructorAccessorMethod =
+
Constructor.class.getDeclaredMethod("acquireConstructorAccessor");
+ acquireConstructorAccessorMethod.setAccessible(true);
+ ca = (ConstructorAccessor)
acquireConstructorAccessorMethod.invoke(declaredConstructors[0]);
+ }
+ Object instance = ca.newInstance(new Object[] { "CARBON",
HiveStorageFormat.values().length, "",
+ "org.apache.carbondata.hadoop.api.CarbonFileInputFormat",
--- End diff --
please add assertion for serdes and outputformat class also
---