http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/service/JniCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/service/JniCatalog.java b/fe/src/main/java/com/cloudera/impala/service/JniCatalog.java deleted file mode 100644 index bc71a49..0000000 --- a/fe/src/main/java/com/cloudera/impala/service/JniCatalog.java +++ /dev/null @@ -1,255 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.service; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.authorization.SentryConfig; -import com.cloudera.impala.authorization.User; -import com.cloudera.impala.catalog.CatalogException; -import com.cloudera.impala.catalog.CatalogServiceCatalog; -import com.cloudera.impala.catalog.Db; -import com.cloudera.impala.catalog.Function; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.common.JniUtil; -import com.cloudera.impala.service.BackendConfig; -import com.cloudera.impala.thrift.TCatalogObject; -import com.cloudera.impala.thrift.TDatabase; -import com.cloudera.impala.thrift.TDdlExecRequest; -import com.cloudera.impala.thrift.TFunction; -import com.cloudera.impala.thrift.TGetAllCatalogObjectsResponse; -import com.cloudera.impala.thrift.TGetDbsParams; -import com.cloudera.impala.thrift.TGetDbsResult; -import com.cloudera.impala.thrift.TGetFunctionsRequest; -import com.cloudera.impala.thrift.TGetFunctionsResponse; -import com.cloudera.impala.thrift.TGetTablesParams; -import com.cloudera.impala.thrift.TGetTablesResult; -import com.cloudera.impala.thrift.TLogLevel; -import com.cloudera.impala.thrift.TPrioritizeLoadRequest; -import com.cloudera.impala.thrift.TResetMetadataRequest; -import com.cloudera.impala.thrift.TSentryAdminCheckRequest; -import com.cloudera.impala.thrift.TUniqueId; -import com.cloudera.impala.thrift.TUpdateCatalogRequest; -import com.cloudera.impala.util.GlogAppender; -import com.cloudera.impala.util.PatternMatcher; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - -/** - * JNI-callable interface for the CatalogService. The main point is to serialize - * and de-serialize thrift structures between C and Java parts of the CatalogService. - */ -public class JniCatalog { - private final static Logger LOG = LoggerFactory.getLogger(JniCatalog.class); - private final static TBinaryProtocol.Factory protocolFactory_ = - new TBinaryProtocol.Factory(); - private final CatalogServiceCatalog catalog_; - private final CatalogOpExecutor catalogOpExecutor_; - - // A unique identifier for this instance of the Catalog Service. - private static final TUniqueId catalogServiceId_ = generateId(); - - private static TUniqueId generateId() { - UUID uuid = UUID.randomUUID(); - return new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - } - - public JniCatalog(boolean loadInBackground, int numMetadataLoadingThreads, - String sentryServiceConfig, int impalaLogLevel, int otherLogLevel, - boolean allowAuthToLocal, String kerberosPrincipal) throws InternalException { - BackendConfig.setAuthToLocal(allowAuthToLocal); - Preconditions.checkArgument(numMetadataLoadingThreads > 0); - // This trick saves having to pass a TLogLevel enum, which is an object and more - // complex to pass through JNI. - GlogAppender.Install(TLogLevel.values()[impalaLogLevel], - TLogLevel.values()[otherLogLevel]); - - // Check if the Sentry Service is configured. If so, create a configuration object. - SentryConfig sentryConfig = null; - if (!Strings.isNullOrEmpty(sentryServiceConfig)) { - sentryConfig = new SentryConfig(sentryServiceConfig); - sentryConfig.loadConfig(); - } - LOG.info(JniUtil.getJavaVersion()); - - catalog_ = new CatalogServiceCatalog(loadInBackground, - numMetadataLoadingThreads, sentryConfig, getServiceId(), kerberosPrincipal); - try { - catalog_.reset(); - } catch (CatalogException e) { - LOG.error("Error initialializing Catalog. Please run 'invalidate metadata'", e); - } - catalogOpExecutor_ = new CatalogOpExecutor(catalog_); - } - - public static TUniqueId getServiceId() { return catalogServiceId_; } - - /** - * Gets all catalog objects - */ - public byte[] getCatalogObjects(long from_version) throws ImpalaException, TException { - TGetAllCatalogObjectsResponse resp = - catalog_.getCatalogObjects(from_version); - TSerializer serializer = new TSerializer(protocolFactory_); - return serializer.serialize(resp); - } - - /** - * Gets the current catalog version. - */ - public long getCatalogVersion() { - return catalog_.getCatalogVersion(); - } - - /** - * Executes the given DDL request and returns the result. - */ - public byte[] execDdl(byte[] thriftDdlExecReq) throws ImpalaException { - TDdlExecRequest params = new TDdlExecRequest(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftDdlExecReq); - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(catalogOpExecutor_.execDdlRequest(params)); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Execute a reset metadata statement. See comment in CatalogOpExecutor.java. - */ - public byte[] resetMetadata(byte[] thriftResetMetadataReq) - throws ImpalaException, TException { - TResetMetadataRequest req = new TResetMetadataRequest(); - JniUtil.deserializeThrift(protocolFactory_, req, thriftResetMetadataReq); - TSerializer serializer = new TSerializer(protocolFactory_); - return serializer.serialize(catalogOpExecutor_.execResetMetadata(req)); - } - - /** - * Returns a list of databases matching an optional pattern. - * The argument is a serialized TGetDbParams object. - * The return type is a serialized TGetDbResult object. - */ - public byte[] getDbs(byte[] thriftGetTablesParams) throws ImpalaException, - TException { - TGetDbsParams params = new TGetDbsParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams); - List<Db> dbs = catalog_.getDbs(PatternMatcher.MATCHER_MATCH_ALL); - TGetDbsResult result = new TGetDbsResult(); - List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size()); - for (Db db: dbs) tDbs.add(db.toThrift()); - result.setDbs(tDbs); - TSerializer serializer = new TSerializer(protocolFactory_); - return serializer.serialize(result); - } - - /** - * Returns a list of table names matching an optional pattern. - * The argument is a serialized TGetTablesParams object. - * The return type is a serialized TGetTablesResult object. - */ - public byte[] getTableNames(byte[] thriftGetTablesParams) throws ImpalaException, - TException { - TGetTablesParams params = new TGetTablesParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams); - List<String> tables = catalog_.getTableNames(params.db, - PatternMatcher.createHivePatternMatcher(params.pattern)); - TGetTablesResult result = new TGetTablesResult(); - result.setTables(tables); - TSerializer serializer = new TSerializer(protocolFactory_); - return serializer.serialize(result); - } - - /** - * Gets the thrift representation of a catalog object. - */ - public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException, - TException { - TCatalogObject objectDescription = new TCatalogObject(); - JniUtil.deserializeThrift(protocolFactory_, objectDescription, thriftParams); - TSerializer serializer = new TSerializer(protocolFactory_); - return serializer.serialize(catalog_.getTCatalogObject(objectDescription)); - } - - /** - * See comment in CatalogServiceCatalog. - */ - public byte[] getFunctions(byte[] thriftParams) throws ImpalaException, - TException { - TGetFunctionsRequest request = new TGetFunctionsRequest(); - JniUtil.deserializeThrift(protocolFactory_, request, thriftParams); - TSerializer serializer = new TSerializer(protocolFactory_); - if (!request.isSetDb_name()) { - throw new InternalException("Database name must be set in call to " + - "getFunctions()"); - } - - // Get all the functions and convert them to their Thrift representation. - List<Function> fns = catalog_.getFunctions(request.getDb_name()); - TGetFunctionsResponse response = new TGetFunctionsResponse(); - response.setFunctions(new ArrayList<TFunction>(fns.size())); - for (Function fn: fns) { - response.addToFunctions(fn.toThrift()); - } - - return serializer.serialize(response); - } - - public void prioritizeLoad(byte[] thriftLoadReq) throws ImpalaException, - TException { - TPrioritizeLoadRequest request = new TPrioritizeLoadRequest(); - JniUtil.deserializeThrift(protocolFactory_, request, thriftLoadReq); - catalog_.prioritizeLoad(request.getObject_descs()); - } - - /** - * Verifies whether the user is configured as an admin on the Sentry Service. Throws - * an AuthorizationException if the user does not have admin privileges or if there - * were errors communicating with the Sentry Service. - */ - public void checkUserSentryAdmin(byte[] thriftReq) throws ImpalaException, - TException { - TSentryAdminCheckRequest request = new TSentryAdminCheckRequest(); - JniUtil.deserializeThrift(protocolFactory_, request, thriftReq); - catalog_.getSentryProxy().checkUserSentryAdmin( - new User(request.getHeader().getRequesting_user())); - } - - /** - * Process any updates to the metastore required after a query executes. - * The argument is a serialized TCatalogUpdate. - */ - public byte[] updateCatalog(byte[] thriftUpdateCatalog) throws ImpalaException, - TException { - TUpdateCatalogRequest request = new TUpdateCatalogRequest(); - JniUtil.deserializeThrift(protocolFactory_, request, thriftUpdateCatalog); - TSerializer serializer = new TSerializer(protocolFactory_); - return serializer.serialize(catalogOpExecutor_.updateCatalog(request)); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java b/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java deleted file mode 100644 index af9c00c..0000000 --- a/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java +++ /dev/null @@ -1,743 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.service; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.log4j.Appender; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.log4j.FileAppender; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.ToSqlUtils; -import com.cloudera.impala.authorization.AuthorizationConfig; -import com.cloudera.impala.authorization.ImpalaInternalAdminUser; -import com.cloudera.impala.authorization.User; -import com.cloudera.impala.catalog.DataSource; -import com.cloudera.impala.catalog.Db; -import com.cloudera.impala.catalog.Function; -import com.cloudera.impala.catalog.Role; -import com.cloudera.impala.common.FileSystemUtil; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.common.JniUtil; -import com.cloudera.impala.service.BackendConfig; -import com.cloudera.impala.thrift.TCatalogObject; -import com.cloudera.impala.thrift.TDatabase; -import com.cloudera.impala.thrift.TDescribeDbParams; -import com.cloudera.impala.thrift.TDescribeResult; -import com.cloudera.impala.thrift.TDescribeTableParams; -import com.cloudera.impala.thrift.TExecRequest; -import com.cloudera.impala.thrift.TFunctionCategory; -import com.cloudera.impala.thrift.TGetAllHadoopConfigsResponse; -import com.cloudera.impala.thrift.TGetDataSrcsParams; -import com.cloudera.impala.thrift.TGetDataSrcsResult; -import com.cloudera.impala.thrift.TGetDbsParams; -import com.cloudera.impala.thrift.TGetDbsResult; -import com.cloudera.impala.thrift.TGetFunctionsParams; -import com.cloudera.impala.thrift.TGetFunctionsResult; -import com.cloudera.impala.thrift.TGetHadoopConfigRequest; -import com.cloudera.impala.thrift.TGetHadoopConfigResponse; -import com.cloudera.impala.thrift.TGetTablesParams; -import com.cloudera.impala.thrift.TGetTablesResult; -import com.cloudera.impala.thrift.TLoadDataReq; -import com.cloudera.impala.thrift.TLoadDataResp; -import com.cloudera.impala.thrift.TLogLevel; -import com.cloudera.impala.thrift.TMetadataOpRequest; -import com.cloudera.impala.thrift.TQueryCtx; -import com.cloudera.impala.thrift.TResultSet; -import com.cloudera.impala.thrift.TShowFilesParams; -import com.cloudera.impala.thrift.TShowGrantRoleParams; -import com.cloudera.impala.thrift.TShowRolesParams; -import com.cloudera.impala.thrift.TShowRolesResult; -import com.cloudera.impala.thrift.TShowStatsParams; -import com.cloudera.impala.thrift.TTableName; -import com.cloudera.impala.thrift.TUniqueId; -import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest; -import com.cloudera.impala.thrift.TUpdateMembershipRequest; -import com.cloudera.impala.util.GlogAppender; -import com.cloudera.impala.util.PatternMatcher; -import com.cloudera.impala.util.TSessionStateUtil; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * JNI-callable interface onto a wrapped Frontend instance. The main point is to serialise - * and deserialise thrift structures between C and Java. - */ -public class JniFrontend { - private final static Logger LOG = LoggerFactory.getLogger(JniFrontend.class); - private final static TBinaryProtocol.Factory protocolFactory_ = - new TBinaryProtocol.Factory(); - private final Frontend frontend_; - - // Required minimum value (in milliseconds) for the HDFS config - // 'dfs.client.file-block-storage-locations.timeout.millis' - private static final long MIN_DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS = - 10 * 1000; - - /** - * Create a new instance of the Jni Frontend. - */ - public JniFrontend(boolean lazy, String serverName, String authorizationPolicyFile, - String sentryConfigFile, String authPolicyProviderClass, int impalaLogLevel, - int otherLogLevel, boolean allowAuthToLocal) throws InternalException { - BackendConfig.setAuthToLocal(allowAuthToLocal); - GlogAppender.Install(TLogLevel.values()[impalaLogLevel], - TLogLevel.values()[otherLogLevel]); - - // Validate the authorization configuration before initializing the Frontend. - // If there are any configuration problems Impala startup will fail. - AuthorizationConfig authConfig = new AuthorizationConfig(serverName, - authorizationPolicyFile, sentryConfigFile, authPolicyProviderClass); - authConfig.validateConfig(); - if (authConfig.isEnabled()) { - LOG.info(String.format("Authorization is 'ENABLED' using %s", - authConfig.isFileBasedPolicy() ? " file based policy from: " + - authConfig.getPolicyFile() : " using Sentry Policy Service.")); - } else { - LOG.info("Authorization is 'DISABLED'."); - } - LOG.info(JniUtil.getJavaVersion()); - - frontend_ = new Frontend(authConfig); - } - - /** - * Jni wrapper for Frontend.createExecRequest(). Accepts a serialized - * TQueryContext; returns a serialized TQueryExecRequest. - */ - public byte[] createExecRequest(byte[] thriftQueryContext) - throws ImpalaException { - TQueryCtx queryCtx = new TQueryCtx(); - JniUtil.deserializeThrift(protocolFactory_, queryCtx, thriftQueryContext); - - StringBuilder explainString = new StringBuilder(); - TExecRequest result = frontend_.createExecRequest(queryCtx, explainString); - if (explainString.length() > 0) LOG.debug(explainString.toString()); - - // TODO: avoid creating serializer for each query? - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - // Deserialize and merge each thrift catalog update into a single merged update - public byte[] updateCatalogCache(byte[][] thriftCatalogUpdates) throws ImpalaException { - TUniqueId defaultCatalogServiceId = new TUniqueId(0L, 0L); - TUpdateCatalogCacheRequest mergedUpdateRequest = new TUpdateCatalogCacheRequest( - false, defaultCatalogServiceId, new ArrayList<TCatalogObject>(), - new ArrayList<TCatalogObject>()); - for (byte[] catalogUpdate: thriftCatalogUpdates) { - TUpdateCatalogCacheRequest incrementalRequest = new TUpdateCatalogCacheRequest(); - JniUtil.deserializeThrift(protocolFactory_, incrementalRequest, catalogUpdate); - mergedUpdateRequest.is_delta |= incrementalRequest.is_delta; - if (!incrementalRequest.getCatalog_service_id().equals(defaultCatalogServiceId)) { - mergedUpdateRequest.setCatalog_service_id( - incrementalRequest.getCatalog_service_id()); - } - mergedUpdateRequest.getUpdated_objects().addAll( - incrementalRequest.getUpdated_objects()); - mergedUpdateRequest.getRemoved_objects().addAll( - incrementalRequest.getRemoved_objects()); - } - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(frontend_.updateCatalogCache(mergedUpdateRequest)); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Jni wrapper for Frontend.updateMembership(). Accepts a serialized - * TUpdateMembershipRequest. - */ - public void updateMembership(byte[] thriftMembershipUpdate) throws ImpalaException { - TUpdateMembershipRequest req = new TUpdateMembershipRequest(); - JniUtil.deserializeThrift(protocolFactory_, req, thriftMembershipUpdate); - frontend_.updateMembership(req); - } - - /** - * Loads a table or partition with one or more data files. If the "overwrite" flag - * in the request is true, all existing data in the table/partition will be replaced. - * If the "overwrite" flag is false, the files will be added alongside any existing - * data files. - */ - public byte[] loadTableData(byte[] thriftLoadTableDataParams) - throws ImpalaException, IOException { - TLoadDataReq request = new TLoadDataReq(); - JniUtil.deserializeThrift(protocolFactory_, request, thriftLoadTableDataParams); - TLoadDataResp response = frontend_.loadTableData(request); - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(response); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Return an explain plan based on thriftQueryContext, a serialized TQueryContext. - * This call is thread-safe. - */ - public String getExplainPlan(byte[] thriftQueryContext) throws ImpalaException { - TQueryCtx queryCtx = new TQueryCtx(); - JniUtil.deserializeThrift(protocolFactory_, queryCtx, thriftQueryContext); - String plan = frontend_.getExplainString(queryCtx); - LOG.debug("Explain plan: " + plan); - return plan; - } - - /** - * Implement Hive's pattern-matching semantics for "SHOW TABLE [[LIKE] 'pattern']", and - * return a list of table names matching an optional pattern. - * The only metacharacters are '*' which matches any string of characters, and '|' - * which denotes choice. Doing the work here saves loading tables or databases from the - * metastore (which Hive would do if we passed the call through to the metastore - * client). If the pattern is null, all strings are considered to match. If it is an - * empty string, no strings match. - * - * The argument is a serialized TGetTablesParams object. - * The return type is a serialised TGetTablesResult object. - * @see Frontend#getTableNames - */ - public byte[] getTableNames(byte[] thriftGetTablesParams) throws ImpalaException { - TGetTablesParams params = new TGetTablesParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams); - // If the session was not set it indicates this is an internal Impala call. - User user = params.isSetSession() ? - new User(TSessionStateUtil.getEffectiveUser(params.getSession())) : - ImpalaInternalAdminUser.getInstance(); - - Preconditions.checkState(!params.isSetSession() || user != null ); - List<String> tables = frontend_.getTableNames(params.db, - PatternMatcher.createHivePatternMatcher(params.pattern), user); - - TGetTablesResult result = new TGetTablesResult(); - result.setTables(tables); - - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Returns files info of a table or partition. - * The argument is a serialized TShowFilesParams object. - * The return type is a serialised TResultSet object. - * @see Frontend#getTableFiles - */ - public byte[] getTableFiles(byte[] thriftShowFilesParams) throws ImpalaException { - TShowFilesParams params = new TShowFilesParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftShowFilesParams); - TResultSet result = frontend_.getTableFiles(params); - - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Implement Hive's pattern-matching semantics for "SHOW DATABASES [[LIKE] 'pattern']", - * and return a list of databases matching an optional pattern. - * @see JniFrontend#getTableNames(byte[]) for more detail. - * - * The argument is a serialized TGetDbParams object. - * The return type is a serialised TGetDbResult object. - * @see Frontend#getDbs - */ - public byte[] getDbs(byte[] thriftGetTablesParams) throws ImpalaException { - TGetDbsParams params = new TGetDbsParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams); - // If the session was not set it indicates this is an internal Impala call. - User user = params.isSetSession() ? - new User(TSessionStateUtil.getEffectiveUser(params.getSession())) : - ImpalaInternalAdminUser.getInstance(); - List<Db> dbs = frontend_.getDbs( - PatternMatcher.createHivePatternMatcher(params.pattern), user); - TGetDbsResult result = new TGetDbsResult(); - List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size()); - for (Db db: dbs) tDbs.add(db.toThrift()); - result.setDbs(tDbs); - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Returns a list of data sources matching an optional pattern. - * The argument is a serialized TGetDataSrcsResult object. - * The return type is a serialised TGetDataSrcsResult object. - * @see Frontend#getDataSrcs - */ - public byte[] getDataSrcMetadata(byte[] thriftParams) throws ImpalaException { - TGetDataSrcsParams params = new TGetDataSrcsParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftParams); - - TGetDataSrcsResult result = new TGetDataSrcsResult(); - List<DataSource> dataSources = frontend_.getDataSrcs(params.pattern); - result.setData_src_names(Lists.<String>newArrayListWithCapacity(dataSources.size())); - result.setLocations(Lists.<String>newArrayListWithCapacity(dataSources.size())); - result.setClass_names(Lists.<String>newArrayListWithCapacity(dataSources.size())); - result.setApi_versions(Lists.<String>newArrayListWithCapacity(dataSources.size())); - for (DataSource dataSource: dataSources) { - result.addToData_src_names(dataSource.getName()); - result.addToLocations(dataSource.getLocation()); - result.addToClass_names(dataSource.getClassName()); - result.addToApi_versions(dataSource.getApiVersion()); - } - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - public byte[] getStats(byte[] thriftShowStatsParams) throws ImpalaException { - TShowStatsParams params = new TShowStatsParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftShowStatsParams); - Preconditions.checkState(params.isSetTable_name()); - TResultSet result; - if (params.isIs_show_col_stats()) { - result = frontend_.getColumnStats(params.getTable_name().getDb_name(), - params.getTable_name().getTable_name()); - } else { - result = frontend_.getTableStats(params.getTable_name().getDb_name(), - params.getTable_name().getTable_name()); - } - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Returns a list of function names matching an optional pattern. - * The argument is a serialized TGetFunctionsParams object. - * The return type is a serialised TGetFunctionsResult object. - * @see Frontend#getTableNames - */ - public byte[] getFunctions(byte[] thriftGetFunctionsParams) throws ImpalaException { - TGetFunctionsParams params = new TGetFunctionsParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftGetFunctionsParams); - - TGetFunctionsResult result = new TGetFunctionsResult(); - List<String> signatures = Lists.newArrayList(); - List<String> retTypes = Lists.newArrayList(); - List<String> fnBinaryTypes = Lists.newArrayList(); - List<String> fnIsPersistent = Lists.newArrayList(); - List<Function> fns = frontend_.getFunctions(params.category, params.db, - params.pattern, false); - for (Function fn: fns) { - signatures.add(fn.signatureString()); - retTypes.add(fn.getReturnType().toString()); - fnBinaryTypes.add(fn.getBinaryType().name()); - fnIsPersistent.add(String.valueOf(fn.isPersistent())); - } - result.setFn_signatures(signatures); - result.setFn_ret_types(retTypes); - result.setFn_binary_types(fnBinaryTypes); - result.setFn_persistence(fnIsPersistent); - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Gets the thrift representation of a catalog object. - */ - public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException, - TException { - TCatalogObject objectDescription = new TCatalogObject(); - JniUtil.deserializeThrift(protocolFactory_, objectDescription, thriftParams); - TSerializer serializer = new TSerializer(protocolFactory_); - return serializer.serialize( - frontend_.getCatalog().getTCatalogObject(objectDescription)); - } - - /** - * Returns a database's properties such as its location and comment. - * The argument is a serialized TDescribeDbParams object. - * The return type is a serialised TDescribeDbResult object. - * @see Frontend#describeDb - */ - public byte[] describeDb(byte[] thriftDescribeDbParams) throws ImpalaException { - TDescribeDbParams params = new TDescribeDbParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftDescribeDbParams); - - TDescribeResult result = frontend_.describeDb( - params.getDb(), params.getOutput_style()); - - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Returns a list of the columns making up a table. - * The argument is a serialized TDescribeParams object. - * The return type is a serialised TDescribeResult object. - * @see Frontend#describeTable - */ - public byte[] describeTable(byte[] thriftDescribeTableParams) throws ImpalaException { - TDescribeTableParams params = new TDescribeTableParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftDescribeTableParams); - - TDescribeResult result = frontend_.describeTable( - params.getDb(), params.getTable_name(), params.getOutput_style(), - params.getResult_struct()); - - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Returns a SQL DDL string for creating the specified table. - */ - public String showCreateTable(byte[] thriftTableName) - throws ImpalaException { - TTableName params = new TTableName(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftTableName); - return ToSqlUtils.getCreateTableSql(frontend_.getCatalog().getTable( - params.getDb_name(), params.getTable_name())); - } - - /** - * Returns a SQL DDL string for creating the specified function. - */ - public String showCreateFunction(byte[] thriftShowCreateFunctionParams) - throws ImpalaException { - TGetFunctionsParams params = new TGetFunctionsParams(); - JniUtil.deserializeThrift(protocolFactory_, params, thriftShowCreateFunctionParams); - Preconditions.checkArgument(params.category == TFunctionCategory.SCALAR || - params.category == TFunctionCategory.AGGREGATE); - return ToSqlUtils.getCreateFunctionSql(frontend_.getFunctions( - params.category, params.db, params.pattern, true)); - } - - /** - * Gets all roles - */ - public byte[] getRoles(byte[] showRolesParams) throws ImpalaException { - TShowRolesParams params = new TShowRolesParams(); - JniUtil.deserializeThrift(protocolFactory_, params, showRolesParams); - TShowRolesResult result = new TShowRolesResult(); - - List<Role> roles = Lists.newArrayList(); - if (params.isIs_show_current_roles() || params.isSetGrant_group()) { - User user = new User(params.getRequesting_user()); - Set<String> groupNames; - if (params.isIs_show_current_roles()) { - groupNames = frontend_.getAuthzChecker().getUserGroups(user); - } else { - Preconditions.checkState(params.isSetGrant_group()); - groupNames = Sets.newHashSet(params.getGrant_group()); - } - for (String groupName: groupNames) { - roles.addAll(frontend_.getCatalog().getAuthPolicy().getGrantedRoles(groupName)); - } - } else { - Preconditions.checkState(!params.isIs_show_current_roles()); - roles = frontend_.getCatalog().getAuthPolicy().getAllRoles(); - } - - result.setRole_names(Lists.<String>newArrayListWithExpectedSize(roles.size())); - for (Role role: roles) { - result.getRole_names().add(role.getName()); - } - - Collections.sort(result.getRole_names()); - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - public byte[] getRolePrivileges(byte[] showGrantRolesParams) throws ImpalaException { - TShowGrantRoleParams params = new TShowGrantRoleParams(); - JniUtil.deserializeThrift(protocolFactory_, params, showGrantRolesParams); - TResultSet result = frontend_.getCatalog().getAuthPolicy().getRolePrivileges( - params.getRole_name(), params.getPrivilege()); - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Executes a HiveServer2 metadata operation and returns a TResultSet - */ - public byte[] execHiveServer2MetadataOp(byte[] metadataOpsParams) - throws ImpalaException { - TMetadataOpRequest params = new TMetadataOpRequest(); - JniUtil.deserializeThrift(protocolFactory_, params, metadataOpsParams); - TResultSet result = frontend_.execHiveServer2MetadataOp(params); - - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - public void setCatalogInitialized() { - frontend_.getCatalog().setIsReady(true); - } - - // Caching this saves ~50ms per call to getHadoopConfigAsHtml - private static final Configuration CONF = new Configuration(); - - /** - * Returns a string of all loaded Hadoop configuration parameters as a table of keys - * and values. If asText is true, output in raw text. Otherwise, output in html. - */ - public byte[] getAllHadoopConfigs() throws ImpalaException { - Map<String, String> configs = Maps.newHashMap(); - for (Map.Entry<String, String> e: CONF) { - configs.put(e.getKey(), e.getValue()); - } - TGetAllHadoopConfigsResponse result = new TGetAllHadoopConfigsResponse(); - result.setConfigs(configs); - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Returns the corresponding config value for the given key as a serialized - * TGetHadoopConfigResponse. If the config value is null, the 'value' field in the - * thrift response object will not be set. - */ - public byte[] getHadoopConfig(byte[] serializedRequest) throws ImpalaException { - TGetHadoopConfigRequest request = new TGetHadoopConfigRequest(); - JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest); - TGetHadoopConfigResponse result = new TGetHadoopConfigResponse(); - result.setValue(CONF.get(request.getName())); - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(result); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - } - - /** - * Returns an error string describing all configuration issues. If no config issues are - * found, returns an empty string. - */ - public String checkConfiguration() { - StringBuilder output = new StringBuilder(); - output.append(checkLogFilePermission()); - output.append(checkFileSystem(CONF)); - output.append(checkShortCircuitRead(CONF)); - output.append(checkBlockLocationTracking(CONF)); - return output.toString(); - } - - /** - * Returns an empty string if Impala has permission to write to FE log files. If not, - * returns an error string describing the issues. - */ - private String checkLogFilePermission() { - org.apache.log4j.Logger l4jRootLogger = org.apache.log4j.Logger.getRootLogger(); - Enumeration appenders = l4jRootLogger.getAllAppenders(); - while (appenders.hasMoreElements()) { - Appender appender = (Appender) appenders.nextElement(); - if (appender instanceof FileAppender) { - if (((FileAppender) appender).getFile() == null) { - // If Impala does not have permission to write to the log file, the - // FileAppender will fail to initialize and logFile will be null. - // Unfortunately, we can't get the log file name here. - return "Impala does not have permission to write to the log file specified " + - "in log4j.properties."; - } - } - } - return ""; - } - - /** - * Returns an error message if short circuit reads are enabled but misconfigured. - * Otherwise, returns an empty string, - */ - private String checkShortCircuitRead(Configuration conf) { - if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) { - LOG.info("Short-circuit reads are not enabled."); - return ""; - } - - StringBuilder output = new StringBuilder(); - String errorMessage = "Invalid short-circuit reads configuration:\n"; - String prefix = " - "; - StringBuilder errorCause = new StringBuilder(); - - // dfs.domain.socket.path must be set properly - String domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT); - if (domainSocketPath.isEmpty()) { - errorCause.append(prefix); - errorCause.append(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY); - errorCause.append(" is not configured.\n"); - } else { - // The socket path parent directory must be readable and executable. - File socketFile = new File(domainSocketPath); - File socketDir = socketFile.getParentFile(); - if (socketDir == null || !socketDir.canRead() || !socketDir.canExecute()) { - errorCause.append(prefix); - errorCause.append("Impala cannot read or execute the parent directory of "); - errorCause.append(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY); - errorCause.append("\n"); - } - } - - // dfs.client.use.legacy.blockreader.local must be set to false - if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, - DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT)) { - errorCause.append(prefix); - errorCause.append(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL); - errorCause.append(" should not be enabled.\n"); - } - - if (errorCause.length() > 0) { - output.append(errorMessage); - output.append(errorCause); - } - - return output.toString(); - } - - /** - * Return an empty string if block location tracking is properly enabled. If not, - * return an error string describing the issues. - */ - private String checkBlockLocationTracking(Configuration conf) { - StringBuilder output = new StringBuilder(); - String errorMessage = "ERROR: block location tracking is not properly enabled " + - "because\n"; - String prefix = " - "; - StringBuilder errorCause = new StringBuilder(); - if (!conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT)) { - errorCause.append(prefix); - errorCause.append(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED); - errorCause.append(" is not enabled.\n"); - } - - // dfs.client.file-block-storage-locations.timeout.millis should be >= 10 seconds - int dfsClientFileBlockStorageLocationsTimeoutMs = conf.getInt( - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS, - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT); - if (dfsClientFileBlockStorageLocationsTimeoutMs < - MIN_DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS) { - errorCause.append(prefix); - errorCause.append(DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS); - errorCause.append(" is too low. It should be at least 10 seconds.\n"); - } - - if (errorCause.length() > 0) { - output.append(errorMessage); - output.append(errorCause); - } - - return output.toString(); - } - - /** - * Return an empty string if the default FileSystem configured in CONF refers to a - * DistributedFileSystem and Impala can list the root directory "/". Otherwise, - * return an error string describing the issues. - */ - private String checkFileSystem(Configuration conf) { - try { - FileSystem fs = FileSystem.get(CONF); - if (!(fs instanceof DistributedFileSystem || fs instanceof S3AFileSystem)) { - return "Currently configured default filesystem: " + - fs.getClass().getSimpleName() + ". " + - CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY + - " (" + CONF.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) + ")" + - " is not supported."; - } - } catch (IOException e) { - return "couldn't retrieve FileSystem:\n" + e.getMessage(); - } - - try { - FileSystemUtil.getTotalNumVisibleFiles(new Path("/")); - } catch (IOException e) { - return "Could not read the root directory at " + - CONF.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) + - ". Error was: \n" + e.getMessage(); - } - return ""; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/service/MetadataOp.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/service/MetadataOp.java b/fe/src/main/java/com/cloudera/impala/service/MetadataOp.java deleted file mode 100644 index ebfd984..0000000 --- a/fe/src/main/java/com/cloudera/impala/service/MetadataOp.java +++ /dev/null @@ -1,641 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.service; - -import java.sql.DatabaseMetaData; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.TableName; -import com.cloudera.impala.authorization.User; -import com.cloudera.impala.catalog.Column; -import com.cloudera.impala.catalog.Db; -import com.cloudera.impala.catalog.Function; -import com.cloudera.impala.catalog.ImpaladCatalog; -import com.cloudera.impala.catalog.PrimitiveType; -import com.cloudera.impala.catalog.ScalarType; -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.catalog.TableLoadingException; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.thrift.TColumn; -import com.cloudera.impala.thrift.TColumnValue; -import com.cloudera.impala.thrift.TResultRow; -import com.cloudera.impala.thrift.TResultSet; -import com.cloudera.impala.thrift.TResultSetMetadata; -import com.cloudera.impala.util.PatternMatcher; -import com.google.common.collect.Lists; - -/** - * Metadata operation. It contains static methods to execute HiveServer2 metadata - * operations and return the results, result schema and an unique request id in - * TResultSet. - */ -public class MetadataOp { - private static final Logger LOG = LoggerFactory.getLogger(MetadataOp.class); - - // Static column values - private static final TColumnValue NULL_COL_VAL = new TColumnValue(); - private static final TColumnValue EMPTY_COL_VAL = createTColumnValue(""); - private static final TColumnValue TABLE_TYPE_COL_VAL = createTColumnValue("TABLE"); - - // Result set schema for each of the metadata operations. - private final static TResultSetMetadata GET_CATALOGS_MD = new TResultSetMetadata(); - private final static TResultSetMetadata GET_COLUMNS_MD = new TResultSetMetadata(); - private final static TResultSetMetadata GET_SCHEMAS_MD = new TResultSetMetadata(); - private final static TResultSetMetadata GET_TABLES_MD = new TResultSetMetadata(); - private static final TResultSetMetadata GET_TYPEINFO_MD = new TResultSetMetadata(); - private static final TResultSetMetadata GET_TABLE_TYPES_MD = new TResultSetMetadata(); - private static final TResultSetMetadata GET_FUNCTIONS_MD = new TResultSetMetadata(); - - // GetTypeInfo contains all primitive types supported by Impala. - private static final List<TResultRow> GET_TYPEINFO_RESULTS = Lists.newArrayList(); - - // GetTableTypes only returns a single value: "TABLE". - private static final List<TResultRow> GET_TABLE_TYPES_RESULTS = Lists.newArrayList(); - - // Initialize result set schemas and static result set - static { - initialzeResultSetSchemas(); - createGetTypeInfoResults(); - createGetTableTypesResults(); - } - - /** - * Initialize result set schema for each of the HiveServer2 operations - */ - private static void initialzeResultSetSchemas() { - GET_CATALOGS_MD.addToColumns(new TColumn("TABLE_CAT", Type.STRING.toThrift())); - - GET_COLUMNS_MD.addToColumns( - new TColumn("TABLE_CAT", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("TABLE_MD", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("TABLE_NAME", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("COLUMN_NAME", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("DATA_TYPE", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("TYPE_NAME", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("COLUMN_SIZE", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("BUFFER_LENGTH", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("DECIMAL_DIGITS", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("NUM_PREC_RADIX", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("NULLABLE", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("REMARKS", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("COLUMN_DEF", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("SQL_DATA_TYPE", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("SQL_DATETIME_SUB", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("CHAR_OCTET_LENGTH", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("ORDINAL_POSITION", Type.INT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("IS_NULLABLE", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("SCOPE_CATALOG", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("SCOPE_SCHEMA", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("SCOPE_TABLE", Type.STRING.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("SOURCE_DATA_TYPE", Type.SMALLINT.toThrift())); - GET_COLUMNS_MD.addToColumns( - new TColumn("IS_AUTO_INCREMENT", Type.STRING.toThrift())); - - GET_SCHEMAS_MD.addToColumns( - new TColumn("TABLE_SCHEM", Type.STRING.toThrift())); - GET_SCHEMAS_MD.addToColumns( - new TColumn("TABLE_CATALOG", Type.STRING.toThrift())); - - GET_TABLES_MD.addToColumns( - new TColumn("TABLE_CAT", Type.STRING.toThrift())); - GET_TABLES_MD.addToColumns( - new TColumn("TABLE_SCHEM", Type.STRING.toThrift())); - GET_TABLES_MD.addToColumns( - new TColumn("TABLE_NAME", Type.STRING.toThrift())); - GET_TABLES_MD.addToColumns( - new TColumn("TABLE_TYPE", Type.STRING.toThrift())); - GET_TABLES_MD.addToColumns( - new TColumn("REMARKS", Type.STRING.toThrift())); - - GET_TYPEINFO_MD.addToColumns( - new TColumn("TYPE_NAME", Type.STRING.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("DATA_TYPE", Type.INT.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("PRECISION", Type.INT.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("LITERAL_PREFIX", Type.STRING.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("LITERAL_SUFFIX", Type.STRING.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("CREATE_PARAMS", Type.STRING.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("NULLABLE", Type.INT.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("CASE_SENSITIVE", Type.BOOLEAN.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("SEARCHABLE", Type.SMALLINT.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("UNSIGNED_ATTRIBUTE", Type.BOOLEAN.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("FIXED_PREC_SCALE", Type.BOOLEAN.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("AUTO_INCREMENT", Type.BOOLEAN.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("LOCAL_TYPE_NAME", Type.STRING.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("MINIMUM_SCALE", Type.SMALLINT.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("MAXIMUM_SCALE", Type.SMALLINT.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("SQL_DATA_TYPE", Type.INT.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("SQL_DATETIME_SUB", Type.INT.toThrift())); - GET_TYPEINFO_MD.addToColumns( - new TColumn("NUM_PREC_RADIX", Type.INT.toThrift())); - - GET_TABLE_TYPES_MD.addToColumns( - new TColumn("TABLE_TYPE", Type.STRING.toThrift())); - - GET_FUNCTIONS_MD.addToColumns( - new TColumn("FUNCTION_CAT", Type.STRING.toThrift())); - GET_FUNCTIONS_MD.addToColumns( - new TColumn("FUNCTION_SCHEM", Type.STRING.toThrift())); - GET_FUNCTIONS_MD.addToColumns( - new TColumn("FUNCTION_NAME", Type.STRING.toThrift())); - GET_FUNCTIONS_MD.addToColumns( - new TColumn("REMARKS", Type.STRING.toThrift())); - GET_FUNCTIONS_MD.addToColumns( - new TColumn("FUNCTION_TYPE", Type.INT.toThrift())); - GET_FUNCTIONS_MD.addToColumns( - new TColumn("SPECIFIC_NAME", Type.STRING.toThrift())); - } - - /** - * Contains lists of databases, lists of table belonging to the dbs, list of columns - * belonging to the tables, and list of user functions. - */ - private static class DbsMetadata { - // the list of database - public List<String> dbs = Lists.newArrayList(); - - // tableNames[i] are the tables within dbs[i] - public List<List<String>> tableNames = Lists.newArrayList(); - - // columns[i][j] are the columns of tableNames[j] in dbs[i]. - // If the table is missing (not yet loaded) its column list will be empty. - public List<List<List<Column>>> columns = Lists.newArrayList(); - - // functions[i] are the functions within dbs[i] - public List<List<Function>> functions = Lists.newArrayList(); - - // Set of tables that are missing (not yet loaded). - public Set<TableName> missingTbls = new HashSet<TableName>(); - } - - /** - * Returns the list of schemas, tables, columns and user functions that match the - * corresponding matchers. - * - * The return value 'result.dbs' contains the list of databases that match - * 'schemaPatternMatcher'. - * 'result.tableNames[i]' contains the list of tables inside dbs[i] that match - * 'tablePatternMatcher'. - * 'result.columns[i][j]' contains the list of columns of table[j] in dbs[i] - * that match 'columnPatternMatcher'. - * result.functions[i] contains the list of functions inside dbs[i] that - * match 'fnPatternMatcher'. - * - * If 'fnPatternMatcher' is not PatternMatcher.MATCHER_MATCH_NONE, then only function - * metadata will be returned. - * If 'tablePatternMatcher' is PatternMatcher.MATCHER_MATCH_NONE, then - * 'result.tableNames' and 'result.columns' will not be populated. - * If columns is null, then 'result.columns' will not be populated. - */ - private static DbsMetadata getDbsMetadata(Frontend fe, String catalogName, - PatternMatcher schemaPatternMatcher, PatternMatcher tablePatternMatcher, - PatternMatcher columnPatternMatcher, PatternMatcher fnPatternMatcher, User user) - throws ImpalaException { - DbsMetadata result = new DbsMetadata(); - - // Hive does not have a catalog concept. Returns nothing if the request specifies an - // non-empty catalog pattern. - if (!isEmptyPattern(catalogName)) { - return result; - } - - ImpaladCatalog catalog = fe.getCatalog(); - for (Db db: fe.getDbs(schemaPatternMatcher, user)) { - if (fnPatternMatcher != PatternMatcher.MATCHER_MATCH_NONE) { - // Get function metadata - List<Function> fns = db.getFunctions(null, fnPatternMatcher); - result.functions.add(fns); - } else { - // Get table metadata - List<String> tableList = Lists.newArrayList(); - List<List<Column>> tablesColumnsList = Lists.newArrayList(); - for (String tabName: fe.getTableNames(db.getName(), tablePatternMatcher, user)) { - tableList.add(tabName); - Table table = null; - try { - table = catalog.getTable(db.getName(), tabName); - } catch (TableLoadingException e) { - // Ignore exception (this table will be skipped). - } - if (table == null) continue; - - List<Column> columns = Lists.newArrayList(); - // If the table is not yet loaded, the columns will be unknown. Add it - // to the set of missing tables. - if (!table.isLoaded()) { - result.missingTbls.add(new TableName(db.getName(), tabName)); - } else { - columns.addAll(fe.getColumns(table, columnPatternMatcher, user)); - } - tablesColumnsList.add(columns); - } - result.dbs.add(db.getName()); - result.tableNames.add(tableList); - result.columns.add(tablesColumnsList); - } - } - return result; - } - - /** - * Executes the GetCatalogs HiveServer2 operation and returns TResultSet. - * Hive does not have a catalog concept. It always returns an empty result set. - */ - public static TResultSet getCatalogs() { - return createEmptyResultSet(GET_CATALOGS_MD); - } - - /** - * Executes the GetColumns HiveServer2 operation and returns TResultSet. - * Queries the Impala catalog to return the list of table columns that fit the - * search patterns. Matching columns requires loading the table metadata, so if - * any missing tables are found an RPC to the CatalogServer will be executed - * to request loading these tables. The matching process will be restarted - * once the required tables have been loaded in the local Impalad Catalog or - * the wait timeout has been reached. - * - * The parameters catalogName, schemaName, tableName and columnName are JDBC search - * patterns. - */ - public static TResultSet getColumns(Frontend fe, - String catalogName, String schemaName, String tableName, String columnName, - User user) - throws ImpalaException { - TResultSet result = createEmptyResultSet(GET_COLUMNS_MD); - - // Get the list of schemas, tables, and columns that satisfy the search conditions. - DbsMetadata dbsMetadata = null; - PatternMatcher schemaMatcher = PatternMatcher.createJdbcPatternMatcher(schemaName); - PatternMatcher tableMatcher = PatternMatcher.createJdbcPatternMatcher(tableName); - PatternMatcher columnMatcher = PatternMatcher.createJdbcPatternMatcher(columnName); - while (dbsMetadata == null || !dbsMetadata.missingTbls.isEmpty()) { - dbsMetadata = getDbsMetadata(fe, catalogName, schemaMatcher, tableMatcher, - columnMatcher, PatternMatcher.MATCHER_MATCH_NONE, user); - if (!fe.requestTblLoadAndWait(dbsMetadata.missingTbls)) { - LOG.info("Timed out waiting for missing tables. Load request will be retried."); - } - } - - for (int i = 0; i < dbsMetadata.dbs.size(); ++i) { - String dbName = dbsMetadata.dbs.get(i); - for (int j = 0; j < dbsMetadata.tableNames.get(i).size(); ++j) { - String tabName = dbsMetadata.tableNames.get(i).get(j); - for (int k = 0; k < dbsMetadata.columns.get(i).get(j).size(); ++k) { - Column column = dbsMetadata.columns.get(i).get(j).get(k); - Type colType = column.getType(); - String colTypeName = getHs2MetadataTypeName(colType); - - TResultRow row = new TResultRow(); - row.colVals = Lists.newArrayList(); - row.colVals.add(NULL_COL_VAL); // TABLE_CAT - row.colVals.add(createTColumnValue(dbName)); // TABLE_SCHEM - row.colVals.add(createTColumnValue(tabName)); // TABLE_NAME - row.colVals.add(createTColumnValue(column.getName())); // COLUMN_NAME - row.colVals.add(createTColumnValue(colType.getJavaSqlType())); // DATA_TYPE - row.colVals.add(createTColumnValue(colTypeName)); // TYPE_NAME - row.colVals.add(createTColumnValue(colType.getColumnSize())); // COLUMN_SIZE - row.colVals.add(NULL_COL_VAL); // BUFFER_LENGTH, unused - // DECIMAL_DIGITS - row.colVals.add(createTColumnValue(colType.getDecimalDigits())); - // NUM_PREC_RADIX - row.colVals.add(createTColumnValue(colType.getNumPrecRadix())); - // NULLABLE - row.colVals.add(createTColumnValue(DatabaseMetaData.columnNullable)); - row.colVals.add(NULL_COL_VAL); // REMARKS - row.colVals.add(NULL_COL_VAL); // COLUMN_DEF - row.colVals.add(NULL_COL_VAL); // SQL_DATA_TYPE - row.colVals.add(NULL_COL_VAL); // SQL_DATETIME_SUB - row.colVals.add(NULL_COL_VAL); // CHAR_OCTET_LENGTH - // ORDINAL_POSITION starts from 1 - row.colVals.add(createTColumnValue(column.getPosition() + 1)); - row.colVals.add(createTColumnValue("YES")); // IS_NULLABLE - row.colVals.add(NULL_COL_VAL); // SCOPE_CATALOG - row.colVals.add(NULL_COL_VAL); // SCOPE_SCHEMA - row.colVals.add(NULL_COL_VAL); // SCOPE_TABLE - row.colVals.add(NULL_COL_VAL); // SOURCE_DATA_TYPE - row.colVals.add(createTColumnValue("NO")); // IS_AUTO_INCREMENT - result.rows.add(row); - } - } - } - LOG.debug("Returning " + result.rows.size() + " table columns"); - return result; - } - - /** - * Returns the string representation of the given Impala column type to populate the - * TYPE_NAME column of the result set returned by a HiveServer2 GetColumns() request. - * - * To be consistent with Hive's behavior, the TYPE_NAME field is populated with the - * primitive type name for scalar types, and with the full toSql() for complex types. - * The resulting type names are somewhat inconsistent, because nested types are printed - * differently than top-level types, e.g.: - * toSql() TYPE_NAME - * DECIMAL(10,10) --> DECIMAL - * CHAR(10) --> CHAR - * VARCHAR(10) --> VARCHAR - * ARRAY<DECIMAL(10,10)> --> ARRAY<DECIMAL(10,10)> - * ARRAY<CHAR(10)> --> ARRAY<CHAR(10)> - * ARRAY<VARCHAR(10)> --> ARRAY<VARCHAR(10)> - */ - private static String getHs2MetadataTypeName(Type colType) { - if (colType.isScalarType()) return colType.getPrimitiveType().toString(); - return colType.toSql(); - } - - /** - * Executes the GetSchemas HiveServer2 operation and returns TResultSet. - * It queries the Impala catalog to return the list of schemas that fit the search - * pattern. - * catalogName and schemaName are JDBC search patterns. - */ - public static TResultSet getSchemas(Frontend fe, - String catalogName, String schemaName, User user) throws ImpalaException { - TResultSet result = createEmptyResultSet(GET_SCHEMAS_MD); - - // Get the list of schemas that satisfy the search condition. - DbsMetadata dbsMetadata = getDbsMetadata(fe, catalogName, - PatternMatcher.createJdbcPatternMatcher(schemaName), - PatternMatcher.MATCHER_MATCH_NONE, - PatternMatcher.MATCHER_MATCH_NONE, - PatternMatcher.MATCHER_MATCH_NONE, user); - - for (int i = 0; i < dbsMetadata.dbs.size(); ++i) { - String dbName = dbsMetadata.dbs.get(i); - TResultRow row = new TResultRow(); - row.colVals = Lists.newArrayList(); - row.colVals.add(createTColumnValue(dbName)); // TABLE_SCHEM - row.colVals.add(EMPTY_COL_VAL); // default Hive catalog is an empty string. - result.rows.add(row); - } - - LOG.debug("Returning " + result.rows.size() + " schemas"); - return result; - } - - /** - * Executes the GetTables HiveServer2 operation and returns TResultSet. - * It queries the Impala catalog to return the list of tables that fit the search - * patterns. - * catalogName, schemaName and tableName are JDBC search patterns. - * tableTypes specifies which table types to search for (TABLE, VIEW, etc). - */ - public static TResultSet getTables(Frontend fe, String catalogName, - String schemaName, String tableName, List<String> tableTypes, User user) - throws ImpalaException{ - TResultSet result = createEmptyResultSet(GET_TABLES_MD); - - // Impala catalog only contains TABLE. Returns an empty set if the search does not - // include TABLE. - if (tableTypes != null && !tableTypes.isEmpty()) { - boolean hasTableType = false; - for (String tableType: tableTypes) { - if (tableType.toLowerCase().equals("table")) { - hasTableType = true; - break; - } - } - if (!hasTableType) { - return result; - } - } - - // Get the list of schemas, tables that satisfy the search conditions. - DbsMetadata dbsMetadata = getDbsMetadata(fe, catalogName, - PatternMatcher.createJdbcPatternMatcher(schemaName), - PatternMatcher.createJdbcPatternMatcher(tableName), - PatternMatcher.MATCHER_MATCH_NONE, - PatternMatcher.MATCHER_MATCH_NONE, user); - - for (int i = 0; i < dbsMetadata.dbs.size(); ++i) { - String dbName = dbsMetadata.dbs.get(i); - for (int j = 0; j < dbsMetadata.tableNames.get(i).size(); ++j) { - String tabName = dbsMetadata.tableNames.get(i).get(j); - TResultRow row = new TResultRow(); - row.colVals = Lists.newArrayList(); - row.colVals.add(EMPTY_COL_VAL); - row.colVals.add(createTColumnValue(dbName)); - row.colVals.add(createTColumnValue(tabName)); - row.colVals.add(TABLE_TYPE_COL_VAL); - // TODO: Return table comments when it is available in the Impala catalog. - row.colVals.add(EMPTY_COL_VAL); - result.rows.add(row); - } - } - LOG.debug("Returning " + result.rows.size() + " tables"); - return result; - } - - /** - * Executes the GetTypeInfo HiveServer2 operation and returns Impala supported types. - */ - public static TResultSet getTypeInfo() { - TResultSet result = createEmptyResultSet(GET_TYPEINFO_MD); - result.rows = GET_TYPEINFO_RESULTS; - return result; - } - - /** - * Executes the GetTableTypes HiveServer2 operation. - */ - public static TResultSet getTableTypes() { - TResultSet result = createEmptyResultSet(GET_TABLE_TYPES_MD); - result.rows = GET_TABLE_TYPES_RESULTS; - return result; - } - - /** - * Create a function result row in the JDBC format. - */ - private static TResultRow createFunctionResultRow(Function fn) { - TResultRow row = new TResultRow(); - row.colVals = Lists.newArrayList(); - row.colVals.add(NULL_COL_VAL); // FUNCTION_CAT - row.colVals.add(createTColumnValue(fn.dbName())); // FUNCTION_SCHEM - row.colVals.add(createTColumnValue(fn.functionName())); // FUNCTION_NAME - row.colVals.add(EMPTY_COL_VAL); // REMARKS - // FUNCTION_TYPE - row.colVals.add(createTColumnValue(DatabaseMetaData.functionNoTable)); - row.colVals.add(createTColumnValue(fn.signatureString())); // SPECIFIC_NAME - return row; - } - - /** - * Executes the GetFunctions HiveServer2 operation and returns TResultSet. - * Returns the list of functions that fit the search patterns. - * catalogName, schemaName and functionName are JDBC search patterns. - * @throws ImpalaException - */ - public static TResultSet getFunctions(Frontend fe, - String catalogName, String schemaName, String functionName, - User user) throws ImpalaException { - TResultSet result = createEmptyResultSet(GET_FUNCTIONS_MD); - - // Impala's built-in functions do not have a catalog name or schema name. - if (!isEmptyPattern(catalogName) || !isEmptyPattern(schemaName)) { - return result; - } - - DbsMetadata dbsMetadata = getDbsMetadata(fe, catalogName, - PatternMatcher.createJdbcPatternMatcher(schemaName), - PatternMatcher.MATCHER_MATCH_NONE, - PatternMatcher.MATCHER_MATCH_NONE, - PatternMatcher.createJdbcPatternMatcher(functionName), user); - for (List<Function> fns: dbsMetadata.functions) { - for (Function fn: fns) { - result.rows.add(createFunctionResultRow(fn)); - } - } - - return result; - } - - /** - * Fills the GET_TYPEINFO_RESULTS with supported primitive types. - */ - private static void createGetTypeInfoResults() { - for (PrimitiveType ptype: PrimitiveType.values()) { - if (ptype.equals(PrimitiveType.INVALID_TYPE) || - ptype.equals(PrimitiveType.DATE) || - ptype.equals(PrimitiveType.DATETIME) || - ptype.equals(PrimitiveType.DECIMAL) || - ptype.equals(PrimitiveType.CHAR) || - ptype.equals(PrimitiveType.VARCHAR)) { - continue; - } - Type type = ScalarType.createType(ptype); - TResultRow row = new TResultRow(); - row.colVals = Lists.newArrayList(); - row.colVals.add(createTColumnValue(ptype.name())); // TYPE_NAME - row.colVals.add(createTColumnValue(type.getJavaSqlType())); // DATA_TYPE - row.colVals.add(createTColumnValue(type.getPrecision())); // PRECISION - row.colVals.add(NULL_COL_VAL); // LITERAL_PREFIX - row.colVals.add(NULL_COL_VAL); // LITERAL_SUFFIX - row.colVals.add(NULL_COL_VAL); // CREATE_PARAMS - row.colVals.add(createTColumnValue(DatabaseMetaData.typeNullable)); // NULLABLE - row.colVals.add(createTColumnValue(type.isStringType())); // CASE_SENSITIVE - row.colVals.add(createTColumnValue(DatabaseMetaData.typeSearchable)); // SEARCHABLE - row.colVals.add(createTColumnValue(!type.isNumericType())); // UNSIGNED_ATTRIBUTE - row.colVals.add(createTColumnValue(false)); // FIXED_PREC_SCALE - row.colVals.add(createTColumnValue(false)); // AUTO_INCREMENT - row.colVals.add(NULL_COL_VAL); // LOCAL_TYPE_NAME - row.colVals.add(createTColumnValue(0)); // MINIMUM_SCALE - row.colVals.add(createTColumnValue(0)); // MAXIMUM_SCALE - row.colVals.add(NULL_COL_VAL); // SQL_DATA_TYPE - row.colVals.add(NULL_COL_VAL); // SQL_DATETIME_SUB - row.colVals.add(createTColumnValue(type.getNumPrecRadix())); // NUM_PREC_RADIX - GET_TYPEINFO_RESULTS.add(row); - } - } - - /** - * Fills the GET_TYPEINFO_RESULTS with "TABLE". - */ - private static void createGetTableTypesResults() { - TResultRow row = new TResultRow(); - row.colVals = Lists.newArrayList(); - row.colVals.add(createTColumnValue("TABLE")); - GET_TABLE_TYPES_RESULTS.add(row); - } - - /** - * Returns an TResultSet with the specified schema. The - * result set will be empty. - */ - private static TResultSet createEmptyResultSet(TResultSetMetadata metadata) { - TResultSet result = new TResultSet(); - result.rows = Lists.newArrayList(); - result.schema = metadata; - return result; - } - - // Helper methods to create TColumnValue - public static TColumnValue createTColumnValue(String val) { - TColumnValue colVal = new TColumnValue(); - if (val != null) { - colVal.setString_val(val); - } - return colVal; - } - - public static TColumnValue createTColumnValue(Integer val) { - TColumnValue colVal = new TColumnValue(); - if (val != null) { - colVal.setInt_val(val.intValue()); - } - return colVal; - } - - public static TColumnValue createTColumnValue(Boolean val) { - TColumnValue colVal = new TColumnValue(); - if (val != null) { - colVal.setBool_val(val); - } - return colVal; - } - - /** - * Returns true if the JDBC search pattern is empty: either null, empty string or "%". - */ - public static boolean isEmptyPattern(final String pattern) { - return (pattern == null) || pattern.isEmpty() || - (pattern.length() == 1 && pattern.equals("%")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/AvroSchemaConverter.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaConverter.java b/fe/src/main/java/com/cloudera/impala/util/AvroSchemaConverter.java deleted file mode 100644 index 0feb1c1..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaConverter.java +++ /dev/null @@ -1,209 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloudera.impala.util; - -import java.util.Arrays; -import java.util.List; - -import org.apache.avro.Schema; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.node.IntNode; -import org.codehaus.jackson.node.JsonNodeFactory; - -import com.cloudera.impala.analysis.ColumnDef; -import com.cloudera.impala.catalog.ArrayType; -import com.cloudera.impala.catalog.Column; -import com.cloudera.impala.catalog.MapType; -import com.cloudera.impala.catalog.ScalarType; -import com.cloudera.impala.catalog.StructField; -import com.cloudera.impala.catalog.StructType; -import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.catalog.Type; -import com.google.common.collect.Lists; - -/** - * Utility class to generate an Impala-compatible Avro Schema from other schemas, e.g., - * an Impala table, a list of Impala columns, a list of Hive field schemas, etc. - * - * Error behavior: These functions throw an UnsupportedOperationException when failing - * to generate an Impala-compatible Avro schema, e.g., because of an unknown type or a - * type not supported by Impala. - * - * Behavior for TIMESTAMP: - * A TIMESTAMP column definition maps to an Avro STRING and is created as a STRING column, - * because Avro has no binary TIMESTAMP representation. As a result, no Avro table may - * have a TIMESTAMP column. - */ -public class AvroSchemaConverter { - // Arbitrarily chosen schema name and record prefix. Note that - // record names must be unique within an Avro schema. - private static final String DEFAULT_SCHEMA_NAME = "baseRecord"; - private static final String RECORD_NAME_PREFIX = "record_"; - - // Constants for Avro logical types, in particular, for DECIMAL. - private static final String AVRO_LOGICAL_TYPE = "logicalType"; - private static final String PRECISION_PROP_NAME = "precision"; - private static final String SCALE_PROP_NAME = "scale"; - private static final String AVRO_DECIMAL_TYPE = "decimal"; - - // Used to generate unique record names as required by Avro. - private int recordCounter_ = 0; - - public static Schema convertColumns( - List<Column> columns, String schemaName) { - AvroSchemaConverter converter = new AvroSchemaConverter(); - return converter.convertColumnsImpl(columns, schemaName); - } - - public static Schema convertColumnDefs( - List<ColumnDef> colDefs, String schemaName) { - AvroSchemaConverter converter = new AvroSchemaConverter(); - return converter.convertColumnDefsImpl(colDefs, schemaName); - } - - public static Schema convertFieldSchemas( - List<FieldSchema> fieldSchemas, String schemaName) { - AvroSchemaConverter converter = new AvroSchemaConverter(); - return converter.convertFieldSchemasImpl(fieldSchemas, schemaName); - } - - public static Schema convertTable(Table impalaTable) { - return convertColumns(impalaTable.getColumns(), impalaTable.getFullName()); - } - - private Schema convertColumnsImpl(List<Column> columns, String schemaName) { - List<Schema.Field> avroFields = Lists.newArrayList(); - for (Column column: columns) { - final Schema.Field avroField = new Schema.Field(column.getName(), - createAvroSchema(column.getType()), column.getComment(), null); - avroFields.add(avroField); - } - return createAvroRecord(avroFields, schemaName); - } - - private Schema convertColumnDefsImpl(List<ColumnDef> colDefs, String schemaName) { - List<Schema.Field> avroFields = Lists.newArrayList(); - for (ColumnDef colDef: colDefs) { - final Schema.Field avroField = new Schema.Field(colDef.getColName(), - createAvroSchema(colDef.getType()), colDef.getComment(), null); - avroFields.add(avroField); - } - return createAvroRecord(avroFields, schemaName); - } - - private Schema convertFieldSchemasImpl( - List<FieldSchema> fieldSchemas, String schemaName) { - List<Schema.Field> avroFields = Lists.newArrayList(); - JsonNode nullDefault = JsonNodeFactory.instance.nullNode(); - for (FieldSchema fs: fieldSchemas) { - Type impalaType = Type.parseColumnType(fs.getType()); - if (impalaType == null) { - throw new UnsupportedOperationException( - fs.getType() + " is not a suppported Impala type"); - } - final Schema.Field avroField = new Schema.Field(fs.getName(), - createAvroSchema(impalaType), fs.getComment(), nullDefault); - avroFields.add(avroField); - } - return createAvroRecord(avroFields, schemaName); - } - - private Schema createAvroRecord(List<Schema.Field> avroFields, String schemaName) { - // Name is a required property for an Avro Record. - if (schemaName == null || schemaName.isEmpty()) schemaName = DEFAULT_SCHEMA_NAME; - Schema schema = Schema.createRecord(schemaName, null, null, false); - schema.setFields(avroFields); - return schema; - } - - private Schema createAvroSchema(Type impalaType) { - Schema schema = null; - if (impalaType.isScalarType()) { - schema = createScalarSchema((ScalarType) impalaType); - } else if (impalaType.isArrayType()) { - schema = createArraySchema((ArrayType) impalaType); - } else if (impalaType.isMapType()) { - schema = createMapSchema((MapType) impalaType); - } else if (impalaType.isStructType()) { - schema = createRecordSchema((StructType) impalaType); - } else { - throw new UnsupportedOperationException( - impalaType.toSql() + " cannot be converted to an Avro type"); - } - // Make the Avro schema nullable. - Schema nullSchema = Schema.create(Schema.Type.NULL); - return Schema.createUnion(Arrays.asList(nullSchema, schema)); - } - - private Schema createScalarSchema(ScalarType impalaScalarType) { - switch (impalaScalarType.getPrimitiveType()) { - case STRING: return Schema.create(Schema.Type.STRING); - case CHAR: return Schema.create(Schema.Type.STRING); - case VARCHAR: return Schema.create(Schema.Type.STRING); - case TINYINT: return Schema.create(Schema.Type.INT); - case SMALLINT: return Schema.create(Schema.Type.INT); - case INT: return Schema.create(Schema.Type.INT); - case BIGINT: return Schema.create(Schema.Type.LONG); - case BOOLEAN: return Schema.create(Schema.Type.BOOLEAN); - case FLOAT: return Schema.create(Schema.Type.FLOAT); - case DOUBLE: return Schema.create(Schema.Type.DOUBLE); - case TIMESTAMP: return Schema.create(Schema.Type.STRING); - case DECIMAL: return createDecimalSchema(impalaScalarType); - default: - throw new UnsupportedOperationException( - impalaScalarType.toSql() + " cannot be converted to an Avro type"); - } - } - - private Schema createDecimalSchema(ScalarType impalaDecimalType) { - Schema decimalSchema = Schema.create(Schema.Type.BYTES); - decimalSchema.addProp(AVRO_LOGICAL_TYPE, AVRO_DECIMAL_TYPE); - // precision and scale must be integer values - decimalSchema.addProp(PRECISION_PROP_NAME, - new IntNode(impalaDecimalType.decimalPrecision())); - decimalSchema.addProp(SCALE_PROP_NAME, - new IntNode(impalaDecimalType.decimalScale())); - return decimalSchema; - } - - private Schema createArraySchema(ArrayType impalaArrayType) { - Schema elementSchema = createAvroSchema(impalaArrayType.getItemType()); - return Schema.createArray(elementSchema); - } - - private Schema createMapSchema(MapType impalaMapType) { - // Map keys are always STRING according to the Avro spec. - Schema valueSchema = createAvroSchema(impalaMapType.getValueType()); - return Schema.createMap(valueSchema); - } - - private Schema createRecordSchema(StructType impalaStructType) { - List<Schema.Field> schemaFields = Lists.newArrayList(); - for (StructField structField : impalaStructType.getFields()) { - Schema.Field avroField = new Schema.Field(structField.getName(), - createAvroSchema(structField.getType()), structField.getComment(), null); - schemaFields.add(avroField); - } - // All Avro records in a table must have the name property. - Schema structSchema = Schema.createRecord( - RECORD_NAME_PREFIX + recordCounter_, null, null, false); - ++recordCounter_; - structSchema.setFields(schemaFields); - return structSchema; - } -} \ No newline at end of file
