This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 3333af5d9 IMPALA-12043: Fix "MaxMessageSize reached" exception in
Thrift JNI
3333af5d9 is described below
commit 3333af5d97c9367474fca6177a6dd415d747793f
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Apr 6 13:17:14 2023 -0700
IMPALA-12043: Fix "MaxMessageSize reached" exception in Thrift JNI
Large results from something like COMPUTE INCREMENTAL STATS can result
in "TTransportException: MaxMessageSize reached". This happens when
CatalogdMetaProvider.updateCatalogCache() receives a buffer through the
JNI from NativeGetNextCatalogObjectUpdate that exceeds 100MB.
TByteBuffer inherits from TEndpointTransport, which in Thrift 0.16.0
adds a MaxMessageSize limit. TMemoryBuffer adds a constructor that
allows passing in a TConfiguration object to customize the limit, which
we make use of in IMPALA-11669, but TByteBuffer does not provide a
similar interface (THRIFT-5696) and was overlooked.
This patch fix the issue by copying
org.apache.thrift.transport.TByteBuffer from thrift-0.16.0 to
org.apache.impala.util.TByteBuffer and patch it with fix from
THRIFT-5696. It then pass thrift_rpc_max_message_size value as a
TConfiguration to the TByteBuffer constructor.
Testing:
- Pass CatalogdMetaProviderTest tests.
Change-Id: I105db332cd312d80bac0313090576bc47064ee16
Reviewed-on: http://gerrit.cloudera.org:8080/19712
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/util/backend-gflag-util.cc | 2 +
common/thrift/BackendGflags.thrift | 2 +
.../java/org/apache/thrift/TConfiguration.java | 91 ++++++++++++++
.../thrift/transport/TEndpointTransport.java | 97 ++++++++++++++
.../org/apache/impala/catalog/ImpaladCatalog.java | 9 +-
.../impala/catalog/local/CatalogdMetaProvider.java | 8 +-
.../org/apache/impala/service/BackendConfig.java | 4 +
.../java/org/apache/impala/util/TByteBuffer.java | 140 +++++++++++++++++++++
.../catalog/local/CatalogdMetaProviderTest.java | 17 +++
9 files changed, 366 insertions(+), 4 deletions(-)
diff --git a/be/src/util/backend-gflag-util.cc
b/be/src/util/backend-gflag-util.cc
index cd8f1dc01..e4449575a 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -104,6 +104,7 @@ DECLARE_bool(enable_sync_to_latest_event_on_ddls);
DECLARE_bool(pull_table_types_and_comments);
DECLARE_bool(enable_reload_events);
DECLARE_string(geospatial_library);
+DECLARE_int32(thrift_rpc_max_message_size);
// HS2 SAML2.0 configuration
// Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -407,6 +408,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_min_processing_per_thread(FLAGS_min_processing_per_thread);
cfg.__set_skip_resource_checking_on_last_executor_group_set(
FLAGS_skip_resource_checking_on_last_executor_group_set);
+ cfg.__set_thrift_rpc_max_message_size(FLAGS_thrift_rpc_max_message_size);
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift
b/common/thrift/BackendGflags.thrift
index 3f1f03749..ba92d239e 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -250,4 +250,6 @@ struct TBackendGflags {
109: required i64 min_processing_per_thread
110: required bool skip_resource_checking_on_last_executor_group_set
+
+ 111: required i32 thrift_rpc_max_message_size
}
diff --git
a/fe/src/compat-apache-hive-3/java/org/apache/thrift/TConfiguration.java
b/fe/src/compat-apache-hive-3/java/org/apache/thrift/TConfiguration.java
new file mode 100644
index 000000000..9dd039f06
--- /dev/null
+++ b/fe/src/compat-apache-hive-3/java/org/apache/thrift/TConfiguration.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift;
+
+public class TConfiguration {
+ public static final int DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
+ // this value is used consistently across all Thrift libraries
+ public static final int DEFAULT_MAX_FRAME_SIZE = 16384000;
+ public static final int DEFAULT_RECURSION_DEPTH = 64;
+
+ private int maxMessageSize;
+ private int maxFrameSize;
+ private int recursionLimit;
+
+ public TConfiguration() {
+ this(DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_FRAME_SIZE,
DEFAULT_RECURSION_DEPTH);
+ }
+ public TConfiguration(int maxMessageSize, int maxFrameSize, int
recursionLimit) {
+ this.maxFrameSize = maxFrameSize;
+ this.maxMessageSize = maxMessageSize;
+ this.recursionLimit = recursionLimit;
+ }
+
+ public int getMaxMessageSize() { return maxMessageSize; }
+
+ public int getMaxFrameSize() { return maxFrameSize; }
+
+ public int getRecursionLimit() { return recursionLimit; }
+
+ public void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize) { this.maxFrameSize =
maxFrameSize; }
+
+ public void setRecursionLimit(int recursionLimit) {
+ this.recursionLimit = recursionLimit;
+ }
+
+ public static final TConfiguration DEFAULT = new Builder().build();
+
+ public static TConfiguration.Builder custom() { return new Builder(); }
+
+ public static class Builder {
+ private int maxMessageSize;
+ private int maxFrameSize;
+ private int recursionLimit;
+
+ Builder() {
+ super();
+ this.maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+ this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
+ this.recursionLimit = DEFAULT_RECURSION_DEPTH;
+ }
+
+ public Builder setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ return this;
+ }
+
+ public Builder setMaxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ return this;
+ }
+
+ public Builder setRecursionLimit(int recursionLimit) {
+ this.recursionLimit = recursionLimit;
+ return this;
+ }
+
+ public TConfiguration build() {
+ return new TConfiguration(maxMessageSize, maxFrameSize, recursionLimit);
+ }
+ }
+}
diff --git
a/fe/src/compat-apache-hive-3/java/org/apache/thrift/transport/TEndpointTransport.java
b/fe/src/compat-apache-hive-3/java/org/apache/thrift/transport/TEndpointTransport.java
new file mode 100644
index 000000000..c4e6454a4
--- /dev/null
+++
b/fe/src/compat-apache-hive-3/java/org/apache/thrift/transport/TEndpointTransport.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TConfiguration;
+
+import java.util.Objects;
+
+public abstract class TEndpointTransport extends TTransport {
+ protected long getMaxMessageSize() { return
getConfiguration().getMaxMessageSize(); }
+
+ protected long knownMessageSize;
+ protected long remainingMessageSize;
+
+ private TConfiguration _configuration;
+ public TConfiguration getConfiguration() { return _configuration; }
+
+ public TEndpointTransport(TConfiguration config) throws TTransportException {
+ _configuration = Objects.isNull(config) ? new TConfiguration() : config;
+
+ resetConsumedMessageSize(-1);
+ }
+
+ /**
+ * Resets RemainingMessageSize to the configured maximum
+ * @param newSize
+ */
+ protected void resetConsumedMessageSize(long newSize) throws
TTransportException {
+ // full reset
+ if (newSize < 0) {
+ knownMessageSize = getMaxMessageSize();
+ remainingMessageSize = getMaxMessageSize();
+ return;
+ }
+
+ // update only: message size can shrink, but not grow
+ if (newSize > knownMessageSize)
+ throw new TTransportException(
+ TTransportException.END_OF_FILE, "MaxMessageSize reached");
+
+ knownMessageSize = newSize;
+ remainingMessageSize = newSize;
+ }
+
+ /**
+ * Updates RemainingMessageSize to reflect then known real message size
(e.g. framed
+ * transport). Will throw if we already consumed too many bytes or if the
new size is
+ * larger than allowed.
+ * @param size
+ */
+ public void updateKnownMessageSize(long size) throws TTransportException {
+ long consumed = knownMessageSize - remainingMessageSize;
+ resetConsumedMessageSize(size == 0 ? -1 : size);
+ countConsumedMessageBytes(consumed);
+ }
+
+ /**
+ * Throws if there are not enough bytes in the input stream to satisfy a
read of
+ * numBytes bytes of data
+ * @param numBytes
+ */
+ public void checkReadBytesAvailable(long numBytes) throws
TTransportException {
+ if (remainingMessageSize < numBytes)
+ throw new TTransportException(
+ TTransportException.END_OF_FILE, "MaxMessageSize reached");
+ }
+
+ /**
+ * Consumes numBytes from the RemainingMessageSize.
+ * @param numBytes
+ */
+ protected void countConsumedMessageBytes(long numBytes) throws
TTransportException {
+ if (remainingMessageSize >= numBytes) {
+ remainingMessageSize -= numBytes;
+ } else {
+ remainingMessageSize = 0;
+ throw new TTransportException(
+ TTransportException.END_OF_FILE, "MaxMessageSize reached");
+ }
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index bed23effa..965039948 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -33,6 +33,7 @@ import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
+import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TAuthzCacheInvalidation;
import org.apache.impala.thrift.TCatalogObject;
@@ -48,10 +49,11 @@ import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
import org.apache.impala.util.PatternMatcher;
+import org.apache.impala.util.TByteBuffer;
import org.apache.impala.util.TUniqueIdUtil;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,11 +210,14 @@ public class ImpaladCatalog extends Catalog implements
FeCatalog {
Map<TableName, PartitionMetaSummary> partUpdates = new HashMap<>();
long newCatalogVersion = lastSyncedCatalogVersion_.get();
Pair<Boolean, ByteBuffer> update;
+ int maxMessageSize = BackendConfig.INSTANCE.getThriftRpcMaxMessageSize();
+ final TConfiguration config = new TConfiguration(maxMessageSize,
+ TConfiguration.DEFAULT_MAX_FRAME_SIZE,
TConfiguration.DEFAULT_RECURSION_DEPTH);
while ((update =
FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
!= null) {
boolean isDelete = update.first;
TCatalogObject obj = new TCatalogObject();
- obj.read(new TBinaryProtocol(new TByteBuffer(update.second)));
+ obj.read(new TBinaryProtocol(new TByteBuffer(config, update.second)));
String key = Catalog.toCatalogObjectKey(obj);
int len = update.second.capacity();
if (len > 100 * 1024 * 1024 /* 100MB */) {
diff --git
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index fee9311a0..de0f7d653 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -99,12 +99,13 @@ import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.IcebergUtil;
import org.apache.impala.util.ListMap;
+import org.apache.impala.util.TByteBuffer;
import org.apache.impala.util.ThreadNameAnnotator;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TByteBuffer;
import org.ehcache.sizeof.SizeOf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1223,12 +1224,15 @@ public class CatalogdMetaProvider implements
MetaProvider {
ObjectUpdateSequencer hdfsCachePoolSequencer = new ObjectUpdateSequencer();
Pair<Boolean, ByteBuffer> update;
+ int maxMessageSize = BackendConfig.INSTANCE.getThriftRpcMaxMessageSize();
+ final TConfiguration config = new TConfiguration(maxMessageSize,
+ TConfiguration.DEFAULT_MAX_FRAME_SIZE,
TConfiguration.DEFAULT_RECURSION_DEPTH);
while ((update =
FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
!= null) {
boolean isDelete = update.first;
TCatalogObject obj = new TCatalogObject();
try {
- obj.read(new TBinaryProtocol(new TByteBuffer(update.second)));
+ obj.read(new TBinaryProtocol(new TByteBuffer(config, update.second)));
} catch (TException e) {
// TODO(todd) include the bad key here! currently the JNI bridge
doesn't expose
// the key in any way.
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 820d60a17..4031e8483 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -394,4 +394,8 @@ public class BackendConfig {
public boolean isSkipResourceCheckingOnLastExecutorGroupSet() {
return backendCfg_.skip_resource_checking_on_last_executor_group_set;
}
+
+ public int getThriftRpcMaxMessageSize() {
+ return backendCfg_.thrift_rpc_max_message_size;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/util/TByteBuffer.java
b/fe/src/main/java/org/apache/impala/util/TByteBuffer.java
new file mode 100644
index 000000000..3295085f5
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/TByteBuffer.java
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TEndpointTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * ByteBuffer-backed implementation of TTransport.
+ * <p>
+ * This is a copy of org.apache.thrift.transport.TByteBuffer that is patched
with
+ * THRIFT-5696. Once Impala upgrade its IMPALA_THRIFT_POM_VERSION to
thrift-0.19.0, this
+ * class can be replaced with the patched TByteBuffer from thrift java library.
+ */
+public final class TByteBuffer extends TEndpointTransport {
+ private final ByteBuffer byteBuffer;
+
+ /**
+ * Creates a new TByteBuffer wrapping a given NIO ByteBuffer and custom
TConfiguration.
+ *
+ * @param configuration the custom TConfiguration.
+ * @param byteBuffer the NIO ByteBuffer to wrap.
+ * @throws TTransportException on error.
+ */
+ public TByteBuffer(TConfiguration configuration, ByteBuffer byteBuffer)
+ throws TTransportException {
+ super(configuration);
+ this.byteBuffer = byteBuffer;
+ updateKnownMessageSize(byteBuffer.capacity());
+ }
+
+ /**
+ * Creates a new TByteBuffer wrapping a given NIO ByteBuffer.
+ *
+ * @param byteBuffer the NIO ByteBuffer to wrap.
+ * @throws TTransportException on error.
+ */
+ public TByteBuffer(ByteBuffer byteBuffer) throws TTransportException {
+ this(new TConfiguration(), byteBuffer);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ //
+ checkReadBytesAvailable(len);
+
+ final int n = Math.min(byteBuffer.remaining(), len);
+ if (n > 0) {
+ try {
+ byteBuffer.get(buf, off, n);
+ } catch (BufferUnderflowException e) {
+ throw new TTransportException("Unexpected end of input buffer", e);
+ }
+ }
+ return n;
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ try {
+ byteBuffer.put(buf, off, len);
+ } catch (BufferOverflowException e) {
+ throw new TTransportException("Not enough room in output buffer", e);
+ }
+ }
+
+ /**
+ * Gets the underlying NIO ByteBuffer.
+ *
+ * @return the underlying NIO ByteBuffer.
+ */
+ public ByteBuffer getByteBuffer() {
+ return byteBuffer;
+ }
+
+ /**
+ * Convenience method to call clear() on the underlying NIO ByteBuffer.
+ *
+ * @return this instance.
+ */
+ public TByteBuffer clear() {
+ byteBuffer.clear();
+ return this;
+ }
+
+ /**
+ * Convenience method to call flip() on the underlying NIO ByteBuffer.
+ *
+ * @return this instance.
+ */
+ public TByteBuffer flip() {
+ byteBuffer.flip();
+ return this;
+ }
+
+ /**
+ * Convenience method to convert the underlying NIO ByteBuffer to a plain
old byte
+ * array.
+ *
+ * @return the byte array backing the underlying NIO ByteBuffer.
+ */
+ public byte[] toByteArray() {
+ final byte[] data = new byte[byteBuffer.remaining()];
+ byteBuffer.slice().get(data);
+ return data;
+ }
+}
diff --git
a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index 7eb6f1fbf..dbfb59639 100644
---
a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++
b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog.local;
import static org.junit.Assert.*;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -57,6 +58,9 @@ import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TRuntimeProfileNode;
import org.apache.impala.thrift.TTable;
import org.apache.impala.util.ListMap;
+import org.apache.impala.util.TByteBuffer;
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TTransportException;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
@@ -641,4 +645,17 @@ public class CatalogdMetaProviderTest {
assertEquals("Actual paths: " + paths, 1, paths.size());
}
}
+
+ public void testLargeTConfiguration() throws Exception {
+ // THRIFT-5696: Test that TByteBuffer init pass with large max message
size beyond
+ // TConfiguration.DEFAULT_MAX_MESSAGE_SIZE.
+ int maxSize = BackendConfig.INSTANCE.getThriftRpcMaxMessageSize();
+ assertEquals(1024 * 1024 * 1024, maxSize);
+ int bufferSize = (100 * 1024 + 512) * 1024;
+ final TConfiguration configLarge = new TConfiguration(maxSize,
+ TConfiguration.DEFAULT_MAX_FRAME_SIZE,
TConfiguration.DEFAULT_RECURSION_DEPTH);
+ TByteBuffer validTByteBuffer =
+ new TByteBuffer(configLarge, ByteBuffer.allocate(bufferSize));
+ validTByteBuffer.close();
+ }
}