http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index 4bb6b65..79960e4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -17,13 +17,15 @@ package org.apache.impala.catalog; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.Pair; +import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TDataSource; @@ -36,8 +38,10 @@ import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUpdateCatalogCacheRequest; import org.apache.impala.thrift.TUpdateCatalogCacheResponse; import org.apache.impala.util.PatternMatcher; +import org.apache.impala.util.TByteBuffer; import org.apache.log4j.Logger; import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; import com.google.common.base.Preconditions; @@ -114,12 +118,29 @@ public class ImpaladCatalog extends Catalog { } /** + * Update the catalog service Id. Trigger a full update if the service ID changes. + */ + private void setCatalogServiceId(TUniqueId catalog_service_id) throws CatalogException { + // Check for changes in the catalog service ID. + if (!catalogServiceId_.equals(catalog_service_id)) { + boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID); + catalogServiceId_ = catalog_service_id; + if (!firstRun) { + // Throw an exception which will trigger a full topic update request. + throw new CatalogException("Detected catalog service ID change. Aborting " + + "updateCatalog()"); + } + } + } + + /** * Updates the internal Catalog based on the given TCatalogUpdateReq. * This method: - * 1) Updates all top level objects (such as databases and roles). - * 2) Updates all objects that depend on top level objects (such as functions, tables, + * 1) Calls NativeGetNextCatalogObjectUpdate() to get all the updates from the backend. + * 2) Updates all top level objects (such as databases and roles). + * 3) Updates all objects that depend on top level objects (such as functions, tables, * privileges). - * 3) Removes all dropped catalog objects. + * 4) Removes all dropped catalog objects. * * This method is called once per statestore heartbeat and is guaranteed the same * object will not be in both the "updated" list and the "removed" list (it is @@ -132,60 +153,58 @@ public class ImpaladCatalog extends Catalog { * protected. */ public synchronized TUpdateCatalogCacheResponse updateCatalog( - TUpdateCatalogCacheRequest req) throws CatalogException { - // Check for changes in the catalog service ID. - if (!catalogServiceId_.equals(req.getCatalog_service_id())) { - boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID); - catalogServiceId_ = req.getCatalog_service_id(); - if (!firstRun) { - // Throw an exception which will trigger a full topic update request. - throw new CatalogException("Detected catalog service ID change. Aborting " + - "updateCatalog()"); + TUpdateCatalogCacheRequest req) throws CatalogException, TException { + // For updates from catalog op results, the service ID is set in the request. + if (req.isSetCatalog_service_id()) setCatalogServiceId(req.catalog_service_id); + ArrayDeque<TCatalogObject> updatedObjects = new ArrayDeque<>(); + ArrayDeque<TCatalogObject> deletedObjects = new ArrayDeque<>(); + long newCatalogVersion = lastSyncedCatalogVersion_; + Pair<Boolean, ByteBuffer> update; + while ((update = FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr)) + != null) { + TCatalogObject obj = new TCatalogObject(); + obj.read(new TBinaryProtocol(new TByteBuffer(update.second))); + String key = Catalog.toCatalogObjectKey(obj); + int len = update.second.capacity(); + if (len > 100 * 1024 * 1024 /* 100MB */) { + LOG.info("Received large catalog object(>100mb): " + key + " is " + len + + "bytes"); } - } - - // Process updates to top level objects first because they don't depend on any other - // objects already existing in the catalog. - for (TCatalogObject catalogObject: req.getUpdated_objects()) { - if (isTopLevelCatalogObject(catalogObject)) { - Preconditions.checkState(catalogObject.getType() != TCatalogObjectType.CATALOG); - try { - addCatalogObject(catalogObject); - } catch (Exception e) { - LOG.error("Error adding catalog object: " + e.getMessage(), e); - } + if (LOG.isTraceEnabled()) { + LOG.trace((update.first ? "Deleting " : "Adding ") + "item: " + key + " version: " + + obj.catalog_version + " of size: " + len); } - } - - // Process updates to dependent objects next. Since the top level objects were already - // processed, we are guaranteed that the top level objects that the dependent objects - // depend on exist in the catalog. - long newCatalogVersion = lastSyncedCatalogVersion_; - for (TCatalogObject catalogObject: req.getUpdated_objects()) { - if (catalogObject.getType() == TCatalogObjectType.CATALOG) { - newCatalogVersion = catalogObject.getCatalog_version(); - } else if (!isTopLevelCatalogObject(catalogObject)) { - try { - addCatalogObject(catalogObject); - } catch (Exception e) { - LOG.error("Error adding catalog object: " + e.getMessage(), e); + // For statestore updates, the service ID and updated version is wrapped in a + // CATALOG catalog object. + if (obj.type == TCatalogObjectType.CATALOG) { + setCatalogServiceId(obj.catalog.catalog_service_id); + newCatalogVersion = obj.catalog_version; + } else if (!update.first) { + // Update top-level objects first. + if (isTopLevelCatalogObject(obj)) { + updatedObjects.addFirst(obj); + } else { + updatedObjects.addLast(obj); + } + } else { + // Remove low-level objects first. + if (isTopLevelCatalogObject(obj)) { + deletedObjects.addLast(obj); + } else { + deletedObjects.addFirst(obj); } } } - // Now remove all objects from the catalog. First remove low-level objects (tables, - // functions and privileges) and then the top-level objects (databases and roles). - for (TCatalogObject catalogObject: req.getRemoved_objects()) { - if (!isTopLevelCatalogObject(catalogObject)) { - removeCatalogObject(catalogObject); - } - } - for (TCatalogObject catalogObject: req.getRemoved_objects()) { - if (isTopLevelCatalogObject(catalogObject)) { - removeCatalogObject(catalogObject); + for (TCatalogObject catalogObject: updatedObjects) { + try { + addCatalogObject(catalogObject); + } catch (Exception e) { + LOG.error("Error adding catalog object: " + e.getMessage(), e); } } + for (TCatalogObject catalogObject: deletedObjects) removeCatalogObject(catalogObject); lastSyncedCatalogVersion_ = newCatalogVersion; // Cleanup old entries in the log. @@ -195,11 +214,11 @@ public class ImpaladCatalog extends Catalog { synchronized (catalogUpdateEventNotifier_) { catalogUpdateEventNotifier_.notifyAll(); } - return new TUpdateCatalogCacheResponse(catalogServiceId_, - CatalogObjectVersionQueue.INSTANCE.getMinimumVersion()); + CatalogObjectVersionQueue.INSTANCE.getMinimumVersion(), newCatalogVersion); } + /** * Causes the calling thread to wait until a catalog update notification has been sent * or the given timeout has been reached. A timeout value of 0 indicates an indefinite @@ -248,7 +267,7 @@ public class ImpaladCatalog extends Catalog { * This method handles both of these cases. */ public Path getTablePath(org.apache.hadoop.hive.metastore.api.Table msTbl) - throws NoSuchObjectException, MetaException, TException { + throws TException { try (MetaStoreClient msClient = getMetaStoreClient()) { // If the table did not have its path set, build the path based on the the // location property of the parent database. @@ -271,7 +290,7 @@ public class ImpaladCatalog extends Catalog { * > than the given TCatalogObject's version. */ private void addCatalogObject(TCatalogObject catalogObject) - throws TableLoadingException, DatabaseNotFoundException { + throws TableLoadingException { // This item is out of date and should not be applied to the catalog. if (catalogDeltaLog_.wasObjectRemovedAfter(catalogObject)) { if (LOG.isTraceEnabled()) { @@ -404,6 +423,7 @@ public class ImpaladCatalog extends Catalog { } private void addFunction(TFunction fn, long catalogVersion) { + LibCacheSetNeedsRefresh(fn.hdfs_location); Function function = Function.fromThrift(fn); function.setCatalogVersion(catalogVersion); Db db = getDb(function.getFunctionName().getDb()); @@ -427,12 +447,17 @@ public class ImpaladCatalog extends Catalog { } private void addDataSource(TDataSource thrift, long catalogVersion) { + LibCacheSetNeedsRefresh(thrift.hdfs_location); DataSource dataSource = DataSource.fromThrift(thrift); dataSource.setCatalogVersion(catalogVersion); addDataSource(dataSource); } private void removeDataSource(TDataSource thrift, long dropCatalogVersion) { + DataSource src = dataSources_.get(thrift.name); + if (src != null && src.getCatalogVersion() < dropCatalogVersion) { + LibCacheRemoveEntry(src.getLocation()); + } removeDataSource(thrift.getName()); } @@ -468,6 +493,7 @@ public class ImpaladCatalog extends Catalog { // version of the drop, remove the function. Function fn = db.getFunction(thriftFn.getSignature()); if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) { + LibCacheRemoveEntry(fn.getLocation().getLocation()); db.removeFunction(thriftFn.getSignature()); CatalogObjectVersionQueue.INSTANCE.removeVersion( fn.getCatalogVersion()); @@ -506,4 +532,15 @@ public class ImpaladCatalog extends Catalog { public void setIsReady(boolean isReady) { isReady_.set(isReady); } public AuthorizationPolicy getAuthPolicy() { return authPolicy_; } public String getDefaultKuduMasterHosts() { return defaultKuduMasterHosts_; } + + private void LibCacheSetNeedsRefresh(String hdfsLocation) { + if (!FeSupport.NativeLibCacheSetNeedsRefresh(hdfsLocation)) { + LOG.error("NativeLibCacheSetNeedsRefresh(" + hdfsLocation + ") failed."); + } + } + private void LibCacheRemoveEntry(String hdfsLibFile) { + if (!FeSupport.NativeLibCacheRemoveEntry(hdfsLibFile)) { + LOG.error("LibCacheRemoveEntry(" + hdfsLibFile + ") failed."); + } + } }
http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/fe/src/main/java/org/apache/impala/service/FeSupport.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java index 48349c2..b471448 100644 --- a/fe/src/main/java/org/apache/impala/service/FeSupport.java +++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java @@ -17,23 +17,18 @@ package org.apache.impala.service; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Set; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.impala.analysis.BoolLiteral; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.NullLiteral; import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.TableName; import org.apache.impala.common.InternalException; +import org.apache.impala.common.Pair; import org.apache.impala.thrift.TCacheJarParams; import org.apache.impala.thrift.TCacheJarResult; import org.apache.impala.thrift.TCatalogObject; @@ -51,6 +46,13 @@ import org.apache.impala.thrift.TSymbolLookupParams; import org.apache.impala.thrift.TSymbolLookupResult; import org.apache.impala.thrift.TTable; import org.apache.impala.util.NativeLibUtil; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Preconditions; /** @@ -79,6 +81,21 @@ public class FeSupport { // Returns a serialized TCacheJarResult public native static byte[] NativeCacheJar(byte[] thriftCacheJar); + // Adds a topic item to the backend's pending metadata-topic update. + // 'serializationBuffer' is a serialized TCatalogObject. + // The return value is true if the operation succeeds and false otherwise. + public native static boolean NativeAddPendingTopicItem(long nativeCatalogServerPtr, + String key, byte[] serializationBuffer, boolean deleted); + + // Get a catalog object update from the backend. A pair of isDeletion flag and + // serialized TCatalogObject is returned. + public native static Pair<Boolean, ByteBuffer> NativeGetNextCatalogObjectUpdate( + long nativeIteratorPtr); + + // The return value is true if the operation succeeds and false otherwise. + public native static boolean NativeLibCacheSetNeedsRefresh(String hdfsLocation); + public native static boolean NativeLibCacheRemoveEntry(String hdfsLibFile); + // Does an RPCs to the Catalog Server to prioritize the metadata loading of a // one or more catalog objects. To keep our kerberos configuration consolidated, // we make make all RPCs in the BE layer instead of calling the Catalog Server http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index d0936d5..318b248 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -131,6 +131,7 @@ import org.apache.impala.util.MembershipSnapshot; import org.apache.impala.util.PatternMatcher; import org.apache.impala.util.TResultRowBuilder; import org.apache.impala.util.TSessionStateUtil; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,7 +218,7 @@ public class Frontend { public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); } public TUpdateCatalogCacheResponse updateCatalogCache( - TUpdateCatalogCacheRequest req) throws CatalogException { + TUpdateCatalogCacheRequest req) throws CatalogException, TException { if (req.is_delta) return impaladCatalog_.get().updateCatalog(req); // If this is not a delta, this update should replace the current http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/fe/src/main/java/org/apache/impala/service/JniCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index ed5a51a..1d822e4 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -56,6 +56,7 @@ import org.apache.impala.thrift.TUpdateCatalogRequest; import org.apache.impala.thrift.TBackendGflags; import org.apache.impala.util.GlogAppender; import org.apache.impala.util.PatternMatcher; +import org.apache.sentry.hdfs.ThriftSerializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; @@ -118,16 +119,13 @@ public class JniCatalog { public static TUniqueId getServiceId() { return catalogServiceId_; } - /** - * Gets all catalog objects - */ - public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq) - throws ImpalaException, TException { + public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq) throws + ImpalaException, TException { TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest(); JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq); - TGetCatalogDeltaResponse resp = catalog_.getCatalogDelta(params.getFrom_version()); - TSerializer serializer = new TSerializer(protocolFactory_); - return serializer.serialize(resp); + return new TSerializer(protocolFactory_).serialize(new TGetCatalogDeltaResponse( + catalog_.getCatalogDelta(params.getNative_catalog_server_ptr(), + params.getFrom_version()))); } /** http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/fe/src/main/java/org/apache/impala/service/JniFrontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 688bd0e..3d99a4a 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -168,30 +168,11 @@ public class JniFrontend { } // Deserialize and merge each thrift catalog update into a single merged update - public byte[] updateCatalogCache(byte[][] thriftCatalogUpdates) throws ImpalaException { - TUniqueId defaultCatalogServiceId = new TUniqueId(0L, 0L); - TUpdateCatalogCacheRequest mergedUpdateRequest = new TUpdateCatalogCacheRequest( - false, defaultCatalogServiceId, new ArrayList<TCatalogObject>(), - new ArrayList<TCatalogObject>()); - for (byte[] catalogUpdate: thriftCatalogUpdates) { - TUpdateCatalogCacheRequest incrementalRequest = new TUpdateCatalogCacheRequest(); - JniUtil.deserializeThrift(protocolFactory_, incrementalRequest, catalogUpdate); - mergedUpdateRequest.is_delta |= incrementalRequest.is_delta; - if (!incrementalRequest.getCatalog_service_id().equals(defaultCatalogServiceId)) { - mergedUpdateRequest.setCatalog_service_id( - incrementalRequest.getCatalog_service_id()); - } - mergedUpdateRequest.getUpdated_objects().addAll( - incrementalRequest.getUpdated_objects()); - mergedUpdateRequest.getRemoved_objects().addAll( - incrementalRequest.getRemoved_objects()); - } - TSerializer serializer = new TSerializer(protocolFactory_); - try { - return serializer.serialize(frontend_.updateCatalogCache(mergedUpdateRequest)); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } + public byte[] updateCatalogCache(byte[] req) throws ImpalaException, TException { + TUpdateCatalogCacheRequest request = new TUpdateCatalogCacheRequest(); + JniUtil.deserializeThrift(protocolFactory_, request, req); + return new TSerializer(protocolFactory_).serialize( + frontend_.updateCatalogCache(request)); } /** http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/fe/src/main/java/org/apache/impala/util/TByteBuffer.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/TByteBuffer.java b/fe/src/main/java/org/apache/impala/util/TByteBuffer.java new file mode 100644 index 0000000..28d05aa --- /dev/null +++ b/fe/src/main/java/org/apache/impala/util/TByteBuffer.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.util; + +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * ByteBuffer-backed implementation of TTransport. This is copied from thrift 0.10.0. + * TODO: Upgrade thrift to 0.10.0 or higher and remove this file. + */ +public final class TByteBuffer extends TTransport { + private final ByteBuffer byteBuffer; + + public TByteBuffer(ByteBuffer byteBuffer) { this.byteBuffer = byteBuffer; } + + @Override + public boolean isOpen() { return true; } + + @Override + public void open() {} + + @Override + public void close() {} + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + final int n = Math.min(byteBuffer.remaining(), len); + if (n > 0) { + try { + byteBuffer.get(buf, off, n); + } catch (BufferUnderflowException e) { + throw new TTransportException("Unexpected end of input buffer", e); + } + } + return n; + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + throw new TTransportException("Write is not supported by TByteBuffer"); + } +}