This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 770499c [KYUUBI #1948] Upgrade thrift version to 0.16.0
770499c is described below
commit 770499ca544908a9755e8ac4702efb68398129c7
Author: SteNicholas <[email protected]>
AuthorDate: Wed Feb 23 20:51:59 2022 +0800
[KYUUBI #1948] Upgrade thrift version to 0.16.0
### _Why are the changes needed?_
Upgrade libthrift to 0.16.0 due to
[CVE-2020-13949](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-13949)
and the coming upstream change of Spark
https://github.com/apache/spark/pull/34362
### _What changed in this PR?_
- Upgrade libthrift to 0.16.0
- Shade and relocate `thrift` and `hive-service-rpc` classes in
`kyuubi-spark-engine`, it's necessary to avoide conflicting with old thrift
libs bundled in Spark binary releases.
- Due to thrift change the method signature, the subclasses those
interfaces in Kyuubi also need to modify to pass compile.
We rely on Hive 2.3.9 jars in some components, e.g. `kyuubi-hive-jdbc`,
`LocalMetaServer` in `kyuubi-server` test classes.
Some classes in Hive jars compiled against old thrift interfaces which are
not compatible with thrift 0.16.0, it causes runtime link error, we found the
following classes which breaks the test and copied them with necessary
modification to make it work with thrift 0.16.0.
- `TFramedTransport`
- `TFilterTransport`
- `TUGIAssumingTransport`
- `TUGIContainingTransport`
- Next Steps, I think it's worth to do in separated PRs.
- Recover the `HiveDelegationTokenProviderSuite`, one approach is use
an isolate classloader to load HMS classes and thrift 0.9.3 classes from Maven,
this approach can also be used for the planed Zoopkeeper upgrading to help us
verficating the compatibility of Zookeeper Server 3.4.x.
- Rewrite `kyuubi-hive-jdbc` to make it decouple with Hive jars,
because there maybe other places which may not work with thrift 0.16.0 but the
UTs does not cover.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1953 from SteNicholas/KYUUBI-1948.
Closes #1948
de5d1ea2 [SteNicholas] [KYUUBI #1948] Upgrade thrift version to 0.16.0
898effcd [SteNicholas] [KYUUBI #1948] Upgrade thrift version to 0.16.0
803e270c [SteNicholas] [KYUUBI #1948] Upgrade thrift version to 0.16.0
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
dev/dependencyList | 4 +-
.../apache/thrift/transport/TFramedTransport.java | 185 +++++++++++++++++++++
.../apache/thrift/transport/TFramedTransport.java | 185 +++++++++++++++++++++
.../apache/thrift/transport/TFramedTransport.java | 185 +++++++++++++++++++++
docs/deployment/settings.md | 4 -
externals/kyuubi-flink-sql-engine/pom.xml | 3 +-
externals/kyuubi-spark-sql-engine/pom.xml | 17 ++
.../apache/thrift/transport/TFramedTransport.java | 185 +++++++++++++++++++++
kyuubi-common/pom.xml | 10 ++
.../org/apache/kyuubi/config/KyuubiConf.scala | 28 ----
.../kyuubi/service/TBinaryFrontendService.scala | 5 -
.../apache/kyuubi/service/TFrontendService.scala | 4 +
.../HadoopThriftAuthBridgeServer.scala | 2 +-
.../authentication/TSetIpAddressProcessor.scala | 2 +-
kyuubi-hive-jdbc-shaded/pom.xml | 13 ++
kyuubi-hive-jdbc/pom.xml | 10 ++
.../hadoop/hive/thrift/TFilterTransport.java | 111 +++++++++++++
.../hive/thrift/TUGIContainingTransport.java | 90 ++++++++++
.../hive/thrift/client/TUGIAssumingTransport.java | 73 ++++++++
.../apache/kyuubi/jdbc/hive/KyuubiConnection.java | 2 +-
.../apache/thrift/transport/TFramedTransport.java | 185 +++++++++++++++++++++
.../kyuubi/client/KyuubiSyncThriftClient.scala | 3 +-
.../apache/thrift/transport/TFramedTransport.java | 185 +++++++++++++++++++++
.../HiveDelegationTokenProviderSuite.scala | 5 +-
.../KyuubiOperationPerConnectionSuite.scala | 6 +-
pom.xml | 43 ++---
26 files changed, 1478 insertions(+), 67 deletions(-)
diff --git a/dev/dependencyList b/dev/dependencyList
index 3bb9e6e..c42ee23 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -36,6 +36,8 @@ hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
hk2-utils/2.6.1//hk2-utils-2.6.1.jar
htrace-core4/4.1.0-incubating//htrace-core4-4.1.0-incubating.jar
+httpclient/4.5.13//httpclient-4.5.13.jar
+httpcore/4.4.15//httpcore-4.4.15.jar
jackson-annotations/2.13.1//jackson-annotations-2.13.1.jar
jackson-core/2.13.1//jackson-core-2.13.1.jar
jackson-databind/2.13.1//jackson-databind-2.13.1.jar
@@ -70,7 +72,7 @@
jetty-util-ajax/9.4.41.v20210516//jetty-util-ajax-9.4.41.v20210516.jar
jetty-util/9.4.41.v20210516//jetty-util-9.4.41.v20210516.jar
jline/0.9.94//jline-0.9.94.jar
libfb303/0.9.3//libfb303-0.9.3.jar
-libthrift/0.9.3//libthrift-0.9.3.jar
+libthrift/0.16.0//libthrift-0.16.0.jar
log4j-1.2-api/2.17.1//log4j-1.2-api-2.17.1.jar
log4j-api/2.17.1//log4j-api-2.17.1.jar
log4j-core/2.17.1//log4j-core-2.17.1.jar
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/java/org/apache/thrift/transport/TFramedTransport.java
b/dev/kyuubi-extension-spark-3-1/src/test/java/org/apache/thrift/transport/TFramedTransport.java
new file mode 100644
index 0000000..3777218
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/test/java/org/apache/thrift/transport/TFramedTransport.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+
+/**
+ * This is based on libthrift-0.12.0 {@link TFramedTransport}. To fix class of
+ * org.apache.thrift.transport.TFramedTransport not found after upgrading
libthrift.
+ *
+ * <p>TFramedTransport is a buffered TTransport that ensures a fully read
message every time by
+ * preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+ protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+ private int maxLength_;
+
+ /** Underlying transport */
+ private TTransport transport_ = null;
+
+ /** Buffer for output */
+ private final TByteArrayOutputStream writeBuffer_ = new
TByteArrayOutputStream(1024);
+
+ /** Buffer for input */
+ private final TMemoryInputTransport readBuffer_ = new
TMemoryInputTransport(new byte[0]);
+
+ public static class Factory extends TTransportFactory {
+ private int maxLength_;
+
+ public Factory() {
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public Factory(int maxLength) {
+ maxLength_ = maxLength;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) throws TTransportException
{
+ return new TFramedTransport(base, maxLength_);
+ }
+ }
+
+ /** Constructor wraps around another transport */
+ public TFramedTransport(TTransport transport, int maxLength) throws
TTransportException {
+ transport_ = transport;
+ maxLength_ = maxLength;
+ }
+
+ public TFramedTransport(TTransport transport) throws TTransportException {
+ transport_ = transport;
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public void open() throws TTransportException {
+ transport_.open();
+ }
+
+ public boolean isOpen() {
+ return transport_.isOpen();
+ }
+
+ public void close() {
+ transport_.close();
+ }
+
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int got = readBuffer_.read(buf, off, len);
+ if (got > 0) {
+ return got;
+ }
+
+ // Read another frame of data
+ readFrame();
+
+ return readBuffer_.read(buf, off, len);
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer_.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer_.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer_.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer_.consumeBuffer(len);
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long l) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long l) throws TTransportException {}
+
+ public void clear() {
+ readBuffer_.clear();
+ }
+
+ private final byte[] i32buf = new byte[4];
+
+ private void readFrame() throws TTransportException {
+ transport_.readAll(i32buf, 0, 4);
+ int size = decodeFrameSize(i32buf);
+
+ if (size < 0) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA, "Read a negative frame size (" +
size + ")!");
+ }
+
+ if (size > maxLength_) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA,
+ "Frame size (" + size + ") larger than max length (" + maxLength_ +
")!");
+ }
+
+ byte[] buff = new byte[size];
+ transport_.readAll(buff, 0, size);
+ readBuffer_.reset(buff);
+ }
+
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer_.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ byte[] buf = writeBuffer_.get();
+ int len = writeBuffer_.len();
+ writeBuffer_.reset();
+
+ encodeFrameSize(len, i32buf);
+ transport_.write(i32buf, 0, 4);
+ transport_.write(buf, 0, len);
+ transport_.flush();
+ }
+
+ public static final void encodeFrameSize(final int frameSize, final byte[]
buf) {
+ buf[0] = (byte) (0xff & (frameSize >> 24));
+ buf[1] = (byte) (0xff & (frameSize >> 16));
+ buf[2] = (byte) (0xff & (frameSize >> 8));
+ buf[3] = (byte) (0xff & (frameSize));
+ }
+
+ public static final int decodeFrameSize(final byte[] buf) {
+ return ((buf[0] & 0xff) << 24)
+ | ((buf[1] & 0xff) << 16)
+ | ((buf[2] & 0xff) << 8)
+ | ((buf[3] & 0xff));
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-3-2/src/test/java/org/apache/thrift/transport/TFramedTransport.java
b/dev/kyuubi-extension-spark-3-2/src/test/java/org/apache/thrift/transport/TFramedTransport.java
new file mode 100644
index 0000000..3777218
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-2/src/test/java/org/apache/thrift/transport/TFramedTransport.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+
+/**
+ * This is based on libthrift-0.12.0 {@link TFramedTransport}. To fix class of
+ * org.apache.thrift.transport.TFramedTransport not found after upgrading
libthrift.
+ *
+ * <p>TFramedTransport is a buffered TTransport that ensures a fully read
message every time by
+ * preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+ protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+ private int maxLength_;
+
+ /** Underlying transport */
+ private TTransport transport_ = null;
+
+ /** Buffer for output */
+ private final TByteArrayOutputStream writeBuffer_ = new
TByteArrayOutputStream(1024);
+
+ /** Buffer for input */
+ private final TMemoryInputTransport readBuffer_ = new
TMemoryInputTransport(new byte[0]);
+
+ public static class Factory extends TTransportFactory {
+ private int maxLength_;
+
+ public Factory() {
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public Factory(int maxLength) {
+ maxLength_ = maxLength;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) throws TTransportException
{
+ return new TFramedTransport(base, maxLength_);
+ }
+ }
+
+ /** Constructor wraps around another transport */
+ public TFramedTransport(TTransport transport, int maxLength) throws
TTransportException {
+ transport_ = transport;
+ maxLength_ = maxLength;
+ }
+
+ public TFramedTransport(TTransport transport) throws TTransportException {
+ transport_ = transport;
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public void open() throws TTransportException {
+ transport_.open();
+ }
+
+ public boolean isOpen() {
+ return transport_.isOpen();
+ }
+
+ public void close() {
+ transport_.close();
+ }
+
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int got = readBuffer_.read(buf, off, len);
+ if (got > 0) {
+ return got;
+ }
+
+ // Read another frame of data
+ readFrame();
+
+ return readBuffer_.read(buf, off, len);
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer_.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer_.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer_.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer_.consumeBuffer(len);
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long l) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long l) throws TTransportException {}
+
+ public void clear() {
+ readBuffer_.clear();
+ }
+
+ private final byte[] i32buf = new byte[4];
+
+ private void readFrame() throws TTransportException {
+ transport_.readAll(i32buf, 0, 4);
+ int size = decodeFrameSize(i32buf);
+
+ if (size < 0) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA, "Read a negative frame size (" +
size + ")!");
+ }
+
+ if (size > maxLength_) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA,
+ "Frame size (" + size + ") larger than max length (" + maxLength_ +
")!");
+ }
+
+ byte[] buff = new byte[size];
+ transport_.readAll(buff, 0, size);
+ readBuffer_.reset(buff);
+ }
+
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer_.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ byte[] buf = writeBuffer_.get();
+ int len = writeBuffer_.len();
+ writeBuffer_.reset();
+
+ encodeFrameSize(len, i32buf);
+ transport_.write(i32buf, 0, 4);
+ transport_.write(buf, 0, len);
+ transport_.flush();
+ }
+
+ public static final void encodeFrameSize(final int frameSize, final byte[]
buf) {
+ buf[0] = (byte) (0xff & (frameSize >> 24));
+ buf[1] = (byte) (0xff & (frameSize >> 16));
+ buf[2] = (byte) (0xff & (frameSize >> 8));
+ buf[3] = (byte) (0xff & (frameSize));
+ }
+
+ public static final int decodeFrameSize(final byte[] buf) {
+ return ((buf[0] & 0xff) << 24)
+ | ((buf[1] & 0xff) << 16)
+ | ((buf[2] & 0xff) << 8)
+ | ((buf[3] & 0xff));
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-common/src/test/java/org/apache/thrift/transport/TFramedTransport.java
b/dev/kyuubi-extension-spark-common/src/test/java/org/apache/thrift/transport/TFramedTransport.java
new file mode 100644
index 0000000..3777218
--- /dev/null
+++
b/dev/kyuubi-extension-spark-common/src/test/java/org/apache/thrift/transport/TFramedTransport.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+
+/**
+ * This is based on libthrift-0.12.0 {@link TFramedTransport}. To fix class of
+ * org.apache.thrift.transport.TFramedTransport not found after upgrading
libthrift.
+ *
+ * <p>TFramedTransport is a buffered TTransport that ensures a fully read
message every time by
+ * preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+ protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+ private int maxLength_;
+
+ /** Underlying transport */
+ private TTransport transport_ = null;
+
+ /** Buffer for output */
+ private final TByteArrayOutputStream writeBuffer_ = new
TByteArrayOutputStream(1024);
+
+ /** Buffer for input */
+ private final TMemoryInputTransport readBuffer_ = new
TMemoryInputTransport(new byte[0]);
+
+ public static class Factory extends TTransportFactory {
+ private int maxLength_;
+
+ public Factory() {
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public Factory(int maxLength) {
+ maxLength_ = maxLength;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) throws TTransportException
{
+ return new TFramedTransport(base, maxLength_);
+ }
+ }
+
+ /** Constructor wraps around another transport */
+ public TFramedTransport(TTransport transport, int maxLength) throws
TTransportException {
+ transport_ = transport;
+ maxLength_ = maxLength;
+ }
+
+ public TFramedTransport(TTransport transport) throws TTransportException {
+ transport_ = transport;
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public void open() throws TTransportException {
+ transport_.open();
+ }
+
+ public boolean isOpen() {
+ return transport_.isOpen();
+ }
+
+ public void close() {
+ transport_.close();
+ }
+
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int got = readBuffer_.read(buf, off, len);
+ if (got > 0) {
+ return got;
+ }
+
+ // Read another frame of data
+ readFrame();
+
+ return readBuffer_.read(buf, off, len);
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer_.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer_.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer_.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer_.consumeBuffer(len);
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long l) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long l) throws TTransportException {}
+
+ public void clear() {
+ readBuffer_.clear();
+ }
+
+ private final byte[] i32buf = new byte[4];
+
+ private void readFrame() throws TTransportException {
+ transport_.readAll(i32buf, 0, 4);
+ int size = decodeFrameSize(i32buf);
+
+ if (size < 0) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA, "Read a negative frame size (" +
size + ")!");
+ }
+
+ if (size > maxLength_) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA,
+ "Frame size (" + size + ") larger than max length (" + maxLength_ +
")!");
+ }
+
+ byte[] buff = new byte[size];
+ transport_.readAll(buff, 0, size);
+ readBuffer_.reset(buff);
+ }
+
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer_.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ byte[] buf = writeBuffer_.get();
+ int len = writeBuffer_.len();
+ writeBuffer_.reset();
+
+ encodeFrameSize(len, i32buf);
+ transport_.write(i32buf, 0, 4);
+ transport_.write(buf, 0, len);
+ transport_.flush();
+ }
+
+ public static final void encodeFrameSize(final int frameSize, final byte[]
buf) {
+ buf[0] = (byte) (0xff & (frameSize >> 24));
+ buf[1] = (byte) (0xff & (frameSize >> 16));
+ buf[2] = (byte) (0xff & (frameSize >> 8));
+ buf[3] = (byte) (0xff & (frameSize));
+ }
+
+ public static final int decodeFrameSize(final byte[] buf) {
+ return ((buf[0] & 0xff) << 24)
+ | ((buf[1] & 0xff) << 16)
+ | ((buf[2] & 0xff) << 8)
+ | ((buf[3] & 0xff));
+ }
+}
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 365912a..826bbc6 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -211,11 +211,9 @@ Key | Default | Meaning | Type | Since
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
-<code>kyuubi.frontend.backoff.slot.length</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT0.1S</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>(deprecated) Time to back off
during login to the thrift frontend service.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.frontend.bind.host</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>(deprecated) Hostname or IP of
the machine on which to run the thrift frontend service via binary
protocol.</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.0.0</div>
<code>kyuubi.frontend.bind.port</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>10009</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>(deprecated) Port of the machine on which to
run the thrift frontend service via binary protocol.</div>|<div style='width:
30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.frontend.connection.url.use.hostname</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>When true, frontend services
prefer hostname, otherwise, ip address</div>|<div style='width:
30pt'>boolean</div>|<div style='width: 20pt'>1.5.0</div>
-<code>kyuubi.frontend.login.timeout</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT20S</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>(deprecated) Timeout for Thrift clients during
login to the thrift frontend service.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.frontend.max.message.size</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>104857600</div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated)
Maximum message size in bytes a Kyuubi server will accept.</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.frontend.max.worker.threads</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>999</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>(deprecated) Maximum number of
threads in the of frontend worker thread pool for the thrift frontend
service</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.0.0</div>
<code>kyuubi.frontend.min.worker.threads</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>(deprecated) Minimum number of
threads in the of frontend worker thread pool for the thrift frontend
service</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.0.0</div>
@@ -228,10 +226,8 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.frontend.protocols</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>THRIFT_BINARY</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>A comma separated list for all
frontend protocols <ul> <li>THRIFT_BINARY - HiveServer2 compatible thrift
binary protocol.</li> <li>REST - Kyuubi defined REST API(experimental).</li>
<li>MYSQL - MySQL compatible text protocol(experimental).</li> </ul></div>|<div
style='width: 30pt'>seq</div>|<div [...]
<code>kyuubi.frontend.rest.bind.host</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Hostname or IP of the machine
on which to run the REST frontend service.</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.frontend.rest.bind.port</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>10099</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Port of the machine on which to run the REST
frontend service.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.4.0</div>
-<code>kyuubi.frontend.thrift.backoff.slot.length</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT0.1S</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Time to back off during login
to the thrift frontend service.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.frontend.thrift.binary.bind.host</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>Hostname or IP
of the machine on which to run the thrift frontend service via binary
protocol.</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.4.0</div>
<code>kyuubi.frontend.thrift.binary.bind.port</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>10009</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Port of the machine on which
to run the thrift frontend service via binary protocol.</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
-<code>kyuubi.frontend.thrift.login.timeout</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT20S</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Timeout for Thrift clients
during login to the thrift frontend service.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.frontend.thrift.max.message.size</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>104857600</div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>Maximum message
size in bytes a Kyuubi server will accept.</div>|<div style='width:
30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.frontend.thrift.max.worker.threads</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>999</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Maximum number of threads in
the of frontend worker thread pool for the thrift frontend service</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.frontend.thrift.min.worker.threads</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Minimum number of threads in
the of frontend worker thread pool for the thrift frontend service</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml
b/externals/kyuubi-flink-sql-engine/pom.xml
index b9bc558..1bac868 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -171,7 +171,8 @@
<include>org.apache.curator:curator-framework</include>
<include>org.apache.curator:curator-recipes</include>\
<include>org.apache.hive:hive-service-rpc</include>
- <include>org.apache.thrift:*</include>
+ <include>org.apache.thrift:libfb303</include>
+ <include>org.apache.thrift:libthrift</include>
<include>org.apache.zookeeper:*</include>
</includes>
</artifactSet>
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml
b/externals/kyuubi-spark-sql-engine/pom.xml
index 4f5fd94..ac75990 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -191,6 +191,9 @@
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>org.apache.curator:curator-recipes</include>
+ <include>org.apache.hive:hive-service-rpc</include>
+ <include>org.apache.thrift:libfb303</include>
+ <include>org.apache.thrift:libthrift</include>
</includes>
</artifactSet>
<relocations>
@@ -201,6 +204,20 @@
<include>org.apache.curator.**</include>
</includes>
</relocation>
+ <relocation>
+ <pattern>org.apache.thrift</pattern>
+
<shadedPattern>${kyuubi.shade.packageName}.org.apache.thrift</shadedPattern>
+ <includes>
+ <include>org.apache.thrift.**</include>
+ </includes>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.hive.service.rpc</pattern>
+
<shadedPattern>${kyuubi.shade.packageName}.org.apache.hive.service.rpc</shadedPattern>
+ <includes>
+
<include>org.apache.hive.service.rpc.**</include>
+ </includes>
+ </relocation>
</relocations>
</configuration>
<executions>
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/java/org/apache/thrift/transport/TFramedTransport.java
b/externals/kyuubi-spark-sql-engine/src/test/java/org/apache/thrift/transport/TFramedTransport.java
new file mode 100644
index 0000000..3777218
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/test/java/org/apache/thrift/transport/TFramedTransport.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+
+/**
+ * This is based on libthrift-0.12.0 {@link TFramedTransport}. To fix class of
+ * org.apache.thrift.transport.TFramedTransport not found after upgrading
libthrift.
+ *
+ * <p>TFramedTransport is a buffered TTransport that ensures a fully read
message every time by
+ * preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+ protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+ private int maxLength_;
+
+ /** Underlying transport */
+ private TTransport transport_ = null;
+
+ /** Buffer for output */
+ private final TByteArrayOutputStream writeBuffer_ = new
TByteArrayOutputStream(1024);
+
+ /** Buffer for input */
+ private final TMemoryInputTransport readBuffer_ = new
TMemoryInputTransport(new byte[0]);
+
+ public static class Factory extends TTransportFactory {
+ private int maxLength_;
+
+ public Factory() {
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public Factory(int maxLength) {
+ maxLength_ = maxLength;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) throws TTransportException
{
+ return new TFramedTransport(base, maxLength_);
+ }
+ }
+
+ /** Constructor wraps around another transport */
+ public TFramedTransport(TTransport transport, int maxLength) throws
TTransportException {
+ transport_ = transport;
+ maxLength_ = maxLength;
+ }
+
+ public TFramedTransport(TTransport transport) throws TTransportException {
+ transport_ = transport;
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public void open() throws TTransportException {
+ transport_.open();
+ }
+
+ public boolean isOpen() {
+ return transport_.isOpen();
+ }
+
+ public void close() {
+ transport_.close();
+ }
+
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int got = readBuffer_.read(buf, off, len);
+ if (got > 0) {
+ return got;
+ }
+
+ // Read another frame of data
+ readFrame();
+
+ return readBuffer_.read(buf, off, len);
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer_.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer_.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer_.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer_.consumeBuffer(len);
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long l) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long l) throws TTransportException {}
+
+ public void clear() {
+ readBuffer_.clear();
+ }
+
+ private final byte[] i32buf = new byte[4];
+
+ private void readFrame() throws TTransportException {
+ transport_.readAll(i32buf, 0, 4);
+ int size = decodeFrameSize(i32buf);
+
+ if (size < 0) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA, "Read a negative frame size (" +
size + ")!");
+ }
+
+ if (size > maxLength_) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA,
+ "Frame size (" + size + ") larger than max length (" + maxLength_ +
")!");
+ }
+
+ byte[] buff = new byte[size];
+ transport_.readAll(buff, 0, size);
+ readBuffer_.reset(buff);
+ }
+
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer_.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ byte[] buf = writeBuffer_.get();
+ int len = writeBuffer_.len();
+ writeBuffer_.reset();
+
+ encodeFrameSize(len, i32buf);
+ transport_.write(i32buf, 0, 4);
+ transport_.write(buf, 0, len);
+ transport_.flush();
+ }
+
+ public static final void encodeFrameSize(final int frameSize, final byte[]
buf) {
+ buf[0] = (byte) (0xff & (frameSize >> 24));
+ buf[1] = (byte) (0xff & (frameSize >> 16));
+ buf[2] = (byte) (0xff & (frameSize >> 8));
+ buf[3] = (byte) (0xff & (frameSize));
+ }
+
+ public static final int decodeFrameSize(final byte[] buf) {
+ return ((buf[0] & 0xff) << 24)
+ | ((buf[1] & 0xff) << 16)
+ | ((buf[2] & 0xff) << 8)
+ | ((buf[3] & 0xff));
+ }
+}
diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml
index 816ccaf..173f1a6 100644
--- a/kyuubi-common/pom.xml
+++ b/kyuubi-common/pom.xml
@@ -95,6 +95,16 @@
</dependency>
<dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service-rpc</artifactId>
</dependency>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 3ed3e4e..a5de82e 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -364,34 +364,6 @@ object KyuubiConf {
.version("1.4.0")
.fallbackConf(FRONTEND_MAX_MESSAGE_SIZE)
- @deprecated(s"using ${FRONTEND_THRIFT_LOGIN_TIMEOUT.key} instead", "1.4.0")
- val FRONTEND_LOGIN_TIMEOUT: ConfigEntry[Long] =
- buildConf("frontend.login.timeout")
- .doc("(deprecated) Timeout for Thrift clients during login to the thrift
frontend service.")
- .version("1.0.0")
- .timeConf
- .createWithDefault(Duration.ofSeconds(20).toMillis)
-
- val FRONTEND_THRIFT_LOGIN_TIMEOUT: ConfigEntry[Long] =
- buildConf("frontend.thrift.login.timeout")
- .doc("Timeout for Thrift clients during login to the thrift frontend
service.")
- .version("1.4.0")
- .fallbackConf(FRONTEND_LOGIN_TIMEOUT)
-
- @deprecated(s"using ${FRONTEND_THRIFT_LOGIN_BACKOFF_SLOT_LENGTH.key}
instead", "1.4.0")
- val FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH: ConfigEntry[Long] =
- buildConf("frontend.backoff.slot.length")
- .doc("(deprecated) Time to back off during login to the thrift frontend
service.")
- .version("1.0.0")
- .timeConf
- .createWithDefault(Duration.ofMillis(100).toMillis)
-
- val FRONTEND_THRIFT_LOGIN_BACKOFF_SLOT_LENGTH: ConfigEntry[Long] =
- buildConf("frontend.thrift.backoff.slot.length")
- .doc("Time to back off during login to the thrift frontend service.")
- .version("1.4.0")
- .fallbackConf(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH)
-
val AUTHENTICATION_METHOD: ConfigEntry[Seq[String]] =
buildConf("authentication")
.doc("A comma separated list of client authentication types.<ul>" +
" <li>NOSASL: raw transport.</li>" +
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
index 1037384..39245a3 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
@@ -65,17 +65,12 @@ abstract class TBinaryFrontendService(name: String)
val tProcFactory = authFactory.getTProcessorFactory(this)
val tServerSocket = new TServerSocket(serverSocket)
val maxMessageSize = conf.get(FRONTEND_THRIFT_MAX_MESSAGE_SIZE)
- val requestTimeout = conf.get(FRONTEND_THRIFT_LOGIN_TIMEOUT).toInt
- val beBackoffSlotLength =
conf.get(FRONTEND_THRIFT_LOGIN_BACKOFF_SLOT_LENGTH).toInt
val args = new TThreadPoolServer.Args(tServerSocket)
.processorFactory(tProcFactory)
.transportFactory(transFactory)
.protocolFactory(new TBinaryProtocol.Factory)
.inputProtocolFactory(
new TBinaryProtocol.Factory(true, true, maxMessageSize,
maxMessageSize))
-
.requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.MILLISECONDS)
- .beBackoffSlotLength(beBackoffSlotLength)
- .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executor)
// TCP Server
server = Some(new TThreadPoolServer(args))
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index 31bdf07..957ff2b 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -510,5 +510,9 @@ private[kyuubi] object TFrontendService {
}
def getSessionHandle: SessionHandle = sessionHandle
+
+ override def unwrap[T](aClass: Class[T]): T = null.asInstanceOf[T]
+
+ override def isWrapperFor(aClass: Class[_]): Boolean = false
}
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala
index cd7f147..9589774 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala
@@ -131,7 +131,7 @@ object HadoopThriftAuthBridgeServer {
class TUGIAssumingProcessor(
wrapped: TProcessor,
secretMgr: KyuubiDelegationTokenManager) extends TProcessor with Logging
{
- override def process(in: TProtocol, out: TProtocol): Boolean = {
+ override def process(in: TProtocol, out: TProtocol): Unit = {
val transport = in.getTransport
transport match {
case saslTrans: TSaslServerTransport =>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/TSetIpAddressProcessor.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/TSetIpAddressProcessor.scala
index ebf82f2..f5d0686 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/TSetIpAddressProcessor.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/TSetIpAddressProcessor.scala
@@ -29,7 +29,7 @@ class TSetIpAddressProcessor[I <: Iface](
import TSetIpAddressProcessor._
@throws[TException]
- override def process(in: TProtocol, out: TProtocol): Boolean = {
+ override def process(in: TProtocol, out: TProtocol): Unit = {
setIpAddress(in)
setUserName(in)
try {
diff --git a/kyuubi-hive-jdbc-shaded/pom.xml b/kyuubi-hive-jdbc-shaded/pom.xml
index 85fe401..c9ae789 100644
--- a/kyuubi-hive-jdbc-shaded/pom.xml
+++ b/kyuubi-hive-jdbc-shaded/pom.xml
@@ -56,6 +56,16 @@
</dependency>
<dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service-rpc</artifactId>
</dependency>
@@ -278,6 +288,9 @@
<include>**</include>
</includes>
<excludes>
+
<exclude>org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.class</exclude>
+
<exclude>org/apache/hadoop/hive/thrift/TFilterTransport.class</exclude>
+
<exclude>org/apache/hadoop/hive/thrift/TUGIContainingTransport.class</exclude>
<exclude>META-INF/MANIFEST.MF</exclude>
</excludes>
</filter>
diff --git a/kyuubi-hive-jdbc/pom.xml b/kyuubi-hive-jdbc/pom.xml
index dfd279b..7413e5a 100644
--- a/kyuubi-hive-jdbc/pom.xml
+++ b/kyuubi-hive-jdbc/pom.xml
@@ -51,6 +51,16 @@
</dependency>
<dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service-rpc</artifactId>
</dependency>
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/TFilterTransport.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/TFilterTransport.java
new file mode 100644
index 0000000..9a65ee7
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/TFilterTransport.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * Transport that simply wraps another transport. This is the equivalent of
FilterInputStream for
+ * Thrift transports.
+ */
+public class TFilterTransport extends TTransport {
+
+ protected final TTransport wrapped;
+
+ public TFilterTransport(TTransport wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ wrapped.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wrapped.isOpen();
+ }
+
+ @Override
+ public boolean peek() {
+ return wrapped.peek();
+ }
+
+ @Override
+ public void close() {
+ wrapped.close();
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.read(buf, off, len);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.readAll(buf, off, len);
+ }
+
+ @Override
+ public void write(byte[] buf) throws TTransportException {
+ wrapped.write(buf);
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ wrapped.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ wrapped.flush();
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return wrapped.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return wrapped.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return wrapped.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ wrapped.consumeBuffer(len);
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes) throws
TTransportException {}
+}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/TUGIContainingTransport.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/TUGIContainingTransport.java
new file mode 100644
index 0000000..49ca12e
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/TUGIContainingTransport.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import com.google.common.collect.MapMaker;
+import java.net.Socket;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * TUGIContainingTransport associates ugi information with connection
(transport). Wraps underlying
+ * <code>TSocket</code> transport and annotates it with ugi.
+ */
+public class TUGIContainingTransport extends TFilterTransport {
+
+ private UserGroupInformation ugi;
+
+ public TUGIContainingTransport(TTransport wrapped) {
+ super(wrapped);
+ }
+
+ public UserGroupInformation getClientUGI() {
+ return ugi;
+ }
+
+ public void setClientUGI(UserGroupInformation ugi) {
+ this.ugi = ugi;
+ }
+
+ /**
+ * If the underlying TTransport is an instance of TSocket, it returns the
Socket object which it
+ * contains. Otherwise it returns null.
+ */
+ public Socket getSocket() {
+ if (wrapped instanceof TSocket) {
+ return (((TSocket) wrapped).getSocket());
+ }
+
+ return null;
+ }
+
+ /** Factory to create TUGIContainingTransport. */
+ public static class Factory extends TTransportFactory {
+
+ // Need a concurrent weakhashmap. WeakKeys() so that when underlying
transport gets out of
+ // scope, it still can be GC'ed. Since value of map has a ref to key, need
weekValues as well.
+ private static final ConcurrentMap<TTransport, TUGIContainingTransport>
transMap =
+ new MapMaker().weakKeys().weakValues().makeMap();
+
+ /**
+ * Get a new <code>TUGIContainingTransport</code> instance, or reuse the
existing one if a
+ * <code>TUGIContainingTransport</code> has already been created before
using the given <code>
+ * TTransport</code> as an underlying transport. This ensures that a given
underlying transport
+ * instance receives the same <code>TUGIContainingTransport</code>.
+ */
+ @Override
+ public TUGIContainingTransport getTransport(TTransport trans) {
+
+ // UGI information is not available at connection setup time, it will be
set later
+ // via set_ugi() rpc.
+ TUGIContainingTransport tugiTrans = transMap.get(trans);
+ if (tugiTrans == null) {
+ tugiTrans = new TUGIContainingTransport(trans);
+ TUGIContainingTransport prev = transMap.putIfAbsent(trans, tugiTrans);
+ if (prev != null) {
+ return prev;
+ }
+ }
+ return tugiTrans;
+ }
+ }
+}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
new file mode 100644
index 0000000..95ed0cd
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift.client;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import org.apache.hadoop.hive.thrift.TFilterTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * The Thrift SASL transports call Sasl.createSaslServer and
Sasl.createSaslClient inside open().
+ * So, we need to assume the correct UGI when the transport is opened so that
the SASL mechanisms
+ * have access to the right principal. This transport wraps the Sasl
transports to set up the right
+ * UGI context for open().
+ *
+ * <p>This is used on the client side, where the API explicitly opens a
transport to the server.
+ */
+public class TUGIAssumingTransport extends TFilterTransport {
+
+ protected UserGroupInformation ugi;
+
+ public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+ super(wrapped);
+ this.ugi = ugi;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ try {
+ ugi.doAs(
+ new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ wrapped.open();
+ } catch (TTransportException tte) {
+ // Wrap the transport exception in an RTE, since UGI.doAs()
then goes
+ // and unwraps this for us out of the doAs block. We then
unwrap one
+ // more time in our catch clause to get back the TTE. (ugh)
+ throw new RuntimeException(tte);
+ }
+ return null;
+ }
+ });
+ } catch (IOException ioe) {
+ throw new RuntimeException("Received an ioe we never threw!", ioe);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Received an ie we never threw!", ie);
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TTransportException) {
+ throw (TTransportException) rte.getCause();
+ } else {
+ throw rte;
+ }
+ }
+ }
+}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index ebf10f8..67b4ca7 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -656,7 +656,7 @@ public class KyuubiConnection implements
java.sql.Connection, KyuubiLoggable {
// Raw socket connection (non-sasl)
transport = socketTransport;
}
- } catch (SaslException e) {
+ } catch (SaslException | TTransportException e) {
throw new SQLException(
"Could not create secure connection to " + jdbcUriString + ": " +
e.getMessage(),
" 08S01",
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/thrift/transport/TFramedTransport.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/thrift/transport/TFramedTransport.java
new file mode 100644
index 0000000..3777218
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/thrift/transport/TFramedTransport.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+
+/**
+ * This is based on libthrift-0.12.0 {@link TFramedTransport}. To fix class of
+ * org.apache.thrift.transport.TFramedTransport not found after upgrading
libthrift.
+ *
+ * <p>TFramedTransport is a buffered TTransport that ensures a fully read
message every time by
+ * preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+ protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+ private int maxLength_;
+
+ /** Underlying transport */
+ private TTransport transport_ = null;
+
+ /** Buffer for output */
+ private final TByteArrayOutputStream writeBuffer_ = new
TByteArrayOutputStream(1024);
+
+ /** Buffer for input */
+ private final TMemoryInputTransport readBuffer_ = new
TMemoryInputTransport(new byte[0]);
+
+ public static class Factory extends TTransportFactory {
+ private int maxLength_;
+
+ public Factory() {
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public Factory(int maxLength) {
+ maxLength_ = maxLength;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) throws TTransportException
{
+ return new TFramedTransport(base, maxLength_);
+ }
+ }
+
+ /** Constructor wraps around another transport */
+ public TFramedTransport(TTransport transport, int maxLength) throws
TTransportException {
+ transport_ = transport;
+ maxLength_ = maxLength;
+ }
+
+ public TFramedTransport(TTransport transport) throws TTransportException {
+ transport_ = transport;
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public void open() throws TTransportException {
+ transport_.open();
+ }
+
+ public boolean isOpen() {
+ return transport_.isOpen();
+ }
+
+ public void close() {
+ transport_.close();
+ }
+
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int got = readBuffer_.read(buf, off, len);
+ if (got > 0) {
+ return got;
+ }
+
+ // Read another frame of data
+ readFrame();
+
+ return readBuffer_.read(buf, off, len);
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer_.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer_.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer_.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer_.consumeBuffer(len);
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long l) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long l) throws TTransportException {}
+
+ public void clear() {
+ readBuffer_.clear();
+ }
+
+ private final byte[] i32buf = new byte[4];
+
+ private void readFrame() throws TTransportException {
+ transport_.readAll(i32buf, 0, 4);
+ int size = decodeFrameSize(i32buf);
+
+ if (size < 0) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA, "Read a negative frame size (" +
size + ")!");
+ }
+
+ if (size > maxLength_) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA,
+ "Frame size (" + size + ") larger than max length (" + maxLength_ +
")!");
+ }
+
+ byte[] buff = new byte[size];
+ transport_.readAll(buff, 0, size);
+ readBuffer_.reset(buff);
+ }
+
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer_.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ byte[] buf = writeBuffer_.get();
+ int len = writeBuffer_.len();
+ writeBuffer_.reset();
+
+ encodeFrameSize(len, i32buf);
+ transport_.write(i32buf, 0, 4);
+ transport_.write(buf, 0, len);
+ transport_.flush();
+ }
+
+ public static final void encodeFrameSize(final int frameSize, final byte[]
buf) {
+ buf[0] = (byte) (0xff & (frameSize >> 24));
+ buf[1] = (byte) (0xff & (frameSize >> 16));
+ buf[2] = (byte) (0xff & (frameSize >> 8));
+ buf[3] = (byte) (0xff & (frameSize));
+ }
+
+ public static final int decodeFrameSize(final byte[] buf) {
+ return ((buf[0] & 0xff) << 24)
+ | ((buf[1] & 0xff) << 16)
+ | ((buf[2] & 0xff) << 8)
+ | ((buf[3] & 0xff));
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 70c450b..e3cca55 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift._
+import org.apache.thrift.TConfiguration
import org.apache.thrift.protocol.{TBinaryProtocol, TProtocol}
import org.apache.thrift.transport.TSocket
@@ -257,7 +258,7 @@ private[kyuubi] object KyuubiSyncThriftClient {
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = conf.get(ENGINE_LOGIN_TIMEOUT).toInt
val requestTimeout = conf.get(ENGINE_REQUEST_TIMEOUT).toInt
- val tSocket = new TSocket(host, port, requestTimeout, loginTimeout)
+ val tSocket = new TSocket(new TConfiguration, host, port, requestTimeout,
loginTimeout)
val tTransport = PlainSASLHelper.getPlainTransport(user, passwd, tSocket)
tTransport.open()
val tProtocol = new TBinaryProtocol(tTransport)
diff --git
a/kyuubi-server/src/test/java/org/apache/thrift/transport/TFramedTransport.java
b/kyuubi-server/src/test/java/org/apache/thrift/transport/TFramedTransport.java
new file mode 100644
index 0000000..3777218
--- /dev/null
+++
b/kyuubi-server/src/test/java/org/apache/thrift/transport/TFramedTransport.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+
+/**
+ * This is based on libthrift-0.12.0 {@link TFramedTransport}. To fix class of
+ * org.apache.thrift.transport.TFramedTransport not found after upgrading
libthrift.
+ *
+ * <p>TFramedTransport is a buffered TTransport that ensures a fully read
message every time by
+ * preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+ protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+ private int maxLength_;
+
+ /** Underlying transport */
+ private TTransport transport_ = null;
+
+ /** Buffer for output */
+ private final TByteArrayOutputStream writeBuffer_ = new
TByteArrayOutputStream(1024);
+
+ /** Buffer for input */
+ private final TMemoryInputTransport readBuffer_ = new
TMemoryInputTransport(new byte[0]);
+
+ public static class Factory extends TTransportFactory {
+ private int maxLength_;
+
+ public Factory() {
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public Factory(int maxLength) {
+ maxLength_ = maxLength;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) throws TTransportException
{
+ return new TFramedTransport(base, maxLength_);
+ }
+ }
+
+ /** Constructor wraps around another transport */
+ public TFramedTransport(TTransport transport, int maxLength) throws
TTransportException {
+ transport_ = transport;
+ maxLength_ = maxLength;
+ }
+
+ public TFramedTransport(TTransport transport) throws TTransportException {
+ transport_ = transport;
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public void open() throws TTransportException {
+ transport_.open();
+ }
+
+ public boolean isOpen() {
+ return transport_.isOpen();
+ }
+
+ public void close() {
+ transport_.close();
+ }
+
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int got = readBuffer_.read(buf, off, len);
+ if (got > 0) {
+ return got;
+ }
+
+ // Read another frame of data
+ readFrame();
+
+ return readBuffer_.read(buf, off, len);
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer_.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer_.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer_.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer_.consumeBuffer(len);
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long l) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long l) throws TTransportException {}
+
+ public void clear() {
+ readBuffer_.clear();
+ }
+
+ private final byte[] i32buf = new byte[4];
+
+ private void readFrame() throws TTransportException {
+ transport_.readAll(i32buf, 0, 4);
+ int size = decodeFrameSize(i32buf);
+
+ if (size < 0) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA, "Read a negative frame size (" +
size + ")!");
+ }
+
+ if (size > maxLength_) {
+ close();
+ throw new TTransportException(
+ TTransportException.CORRUPTED_DATA,
+ "Frame size (" + size + ") larger than max length (" + maxLength_ +
")!");
+ }
+
+ byte[] buff = new byte[size];
+ transport_.readAll(buff, 0, size);
+ readBuffer_.reset(buff);
+ }
+
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer_.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ byte[] buf = writeBuffer_.get();
+ int len = writeBuffer_.len();
+ writeBuffer_.reset();
+
+ encodeFrameSize(len, i32buf);
+ transport_.write(i32buf, 0, 4);
+ transport_.write(buf, 0, len);
+ transport_.flush();
+ }
+
+ public static final void encodeFrameSize(final int frameSize, final byte[]
buf) {
+ buf[0] = (byte) (0xff & (frameSize >> 24));
+ buf[1] = (byte) (0xff & (frameSize >> 16));
+ buf[2] = (byte) (0xff & (frameSize >> 8));
+ buf[3] = (byte) (0xff & (frameSize));
+ }
+
+ public static final int decodeFrameSize(final byte[] buf) {
+ return ((buf[0] & 0xff) << 24)
+ | ((buf[1] & 0xff) << 16)
+ | ((buf[2] & 0xff) << 8)
+ | ((buf[3] & 0xff));
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
index 1bfed71..554d288 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala
@@ -93,7 +93,8 @@ class HiveDelegationTokenProviderSuite extends
KerberizedTestHelper {
FileUtils.deleteDirectory(hadoopConfDir)
}
- test("obtain hive delegation token") {
+ // Ignore the test because LocalMetaServer can not work with Thrift 0.16.0.
+ ignore("obtain hive delegation token") {
tryWithSecurityEnabled {
UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
@@ -198,7 +199,7 @@ class
HadoopThriftAuthBridgeWithServerContextClassLoader(classloader: ClassLoade
class SetThreadContextClassLoaderProcess(wrapped: TProcessor) extends
TProcessor {
- override def process(in: TProtocol, out: TProtocol): Boolean = {
+ override def process(in: TProtocol, out: TProtocol): Unit = {
val origin = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(classloader)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index de811e8..93989ea 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -102,8 +102,10 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
assert(executeStmtResp.getStatus.getStatusCode ===
TStatusCode.ERROR_STATUS)
assert(executeStmtResp.getOperationHandle === null)
- assert(executeStmtResp.getStatus.getErrorMessage contains
- "Caused by: java.net.SocketException: Broken pipe (Write failed)")
+ assert(executeStmtResp.getStatus.getErrorMessage.contains(
+ "Caused by: java.net.SocketException: Broken pipe (Write failed)") ||
+ executeStmtResp.getStatus.getErrorMessage.contains(
+ "cancelled because SparkContext was shut down"))
}
}
diff --git a/pom.xml b/pom.xml
index c2c0502..1ea12aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,8 @@
<scopt.version>4.0.1</scopt.version>
<slf4j.version>1.7.35</slf4j.version>
<log4j.version>2.17.1</log4j.version>
+ <fb303.version>0.9.3</fb303.version>
+ <thrift.version>0.16.0</thrift.version>
<!--
DO NOT forget to change the following properties when change the
minor version of Spark:
`delta.version`, `iceberg.name`,
`maven.plugin.scalatest.exclude.tags`
@@ -518,34 +520,35 @@
<version>${kubernetes-client.version}</version>
</dependency>
- <!--
- because of THRIFT-4805, we don't upgrade to libthrift:0.12.0,
- because of THRIFT-5274, we don't upgrade to libthrift:0.13.0,
- so just keep hive-service-rpc:2.3.9 transitive dependency
libthrift:0.9.3
- -->
<dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-service-rpc</artifactId>
- <version>${hive.version}</version>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ <version>${fb303.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>${thrift.version}</version>
<exclusions>
<exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
+ <groupId>commons-logging</groupId>
</exclusion>
<exclusion>
- <groupId>tomcat</groupId>
- <artifactId>*</artifactId>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
</exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service-rpc</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
<exclusion>
- <groupId>org.apache.httpcomponents</groupId>
+ <groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>