This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f3759f226c4 [improvement](maxcompute) Simplify FE block ID requests
for MaxCompute writes (#62880)
f3759f226c4 is described below
commit f3759f226c48a1c376b7d00994e98ec10761ec6d
Author: daidai <[email protected]>
AuthorDate: Wed Jun 3 22:40:50 2026 +0800
[improvement](maxcompute) Simplify FE block ID requests for MaxCompute
writes (#62880)
### What problem does this PR solve?
Related PR: #62578
1. PR #62578 moved MaxCompute write block ID allocation from BE-local
counters to
Instead of calling FE through the BE JNI C++ bridge:
MaxCompute connector Java -> BE JNI C++ -> FE
the MaxCompute connector now requests FE directly through thrift:
MaxCompute connector Java -> FE
A new MaxComputeFeClient is added under the MaxCompute connector to
handle FE
methods.
2. Removes the hardcoded `MAX_BLOCK_COUNT` variable from
`fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java`
and moves it to the FE config `max_compute_write_max_block_count`
The default value is still 20000, so the existing behavior is preserved.
---
.../sink/writer/maxcompute/vmc_table_writer.cpp | 6 +
be/src/util/jni-util.cpp | 4 -
be/src/util/jni_native_method.cpp | 129 --------
be/src/util/jni_native_method.h | 4 -
.../doris/common/jni/utils/JNINativeMethod.java | 4 -
.../max-compute-connector/pom.xml | 6 +
.../doris/maxcompute/MaxComputeFeClient.java | 326 +++++++++++++++++++++
.../doris/maxcompute/MaxComputeJniWriter.java | 11 +-
.../doris/maxcompute/MaxComputeFeClientTest.java | 177 +++++++++++
.../main/java/org/apache/doris/common/Config.java | 7 +
.../doris/datasource/maxcompute/MCTransaction.java | 7 +-
11 files changed, 533 insertions(+), 148 deletions(-)
diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
index 8bcd2714020..a7818ea01d2 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
+++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
@@ -22,6 +22,7 @@
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "format/transformer/vjni_format_transformer.h"
+#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"
@@ -98,6 +99,7 @@ Status VMCTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
std::map<std::string, std::string> VMCTableWriter::_build_base_writer_params()
{
auto params = _mc_sink.properties;
+ const auto& master_fe_addr =
_state->exec_env()->cluster_info()->master_fe_addr;
if (_mc_sink.__isset.endpoint) params["endpoint"] = _mc_sink.endpoint;
if (_mc_sink.__isset.project) params["project"] = _mc_sink.project;
if (_mc_sink.__isset.table_name) params["table"] = _mc_sink.table_name;
@@ -117,6 +119,10 @@ std::map<std::string, std::string>
VMCTableWriter::_build_base_writer_params() {
if (_mc_sink.__isset.retry_count) {
params["retry_count"] = std::to_string(_mc_sink.retry_count);
}
+ params["fe_host"] = master_fe_addr.hostname;
+ params["fe_port"] = std::to_string(master_fe_addr.port);
+ params["fe_rpc_timeout_ms"] =
std::to_string(config::thrift_rpc_timeout_ms);
+ params["fe_thrift_server_type"] = config::thrift_server_type_of_fe;
return params;
}
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 7d4a3869085..944b8541146 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -405,8 +405,6 @@ Status Util::_init_register_natives() {
static char memory_alloc_batch_sign[] = "([I)[J";
static char memory_free_batch_name[] = "memoryTrackerFreeBatch";
static char memory_free_batch_sign[] = "([J)V";
- static char request_mc_block_id_name[] = "requestMaxComputeBlockId";
- static char request_mc_block_id_sign[] = "(JLjava/lang/String;)J";
static JNINativeMethod java_native_methods[] = {
{memory_alloc_name, memory_alloc_sign,
(void*)&JavaNativeMethods::memoryMalloc},
{memory_free_name, memory_free_sign,
(void*)&JavaNativeMethods::memoryFree},
@@ -414,8 +412,6 @@ Status Util::_init_register_natives() {
(void*)&JavaNativeMethods::memoryMallocBatch},
{memory_free_batch_name, memory_free_batch_sign,
(void*)&JavaNativeMethods::memoryFreeBatch},
- {request_mc_block_id_name, request_mc_block_id_sign,
- (void*)&JavaNativeMethods::requestMaxComputeBlockId},
};
int res = env->RegisterNatives(local_jni_native_exc_cl,
java_native_methods,
diff --git a/be/src/util/jni_native_method.cpp
b/be/src/util/jni_native_method.cpp
index 6942095b376..549405c766d 100644
--- a/be/src/util/jni_native_method.cpp
+++ b/be/src/util/jni_native_method.cpp
@@ -17,119 +17,14 @@
#include "util/jni_native_method.h"
-#include <gen_cpp/FrontendService.h>
-#include <glog/logging.h>
-
-#include <chrono>
#include <cstdlib>
-#include <thread>
#include <vector>
-#include "common/status.h"
#include "jni.h"
-#include "runtime/exec_env.h"
-#include "util/client_cache.h"
#include "util/defer_op.h"
-#include "util/thrift_rpc_helper.h"
namespace doris {
-namespace {
-
-void throw_java_runtime_exception(JNIEnv* env, const std::string& message) {
- jclass exception_cl = env->FindClass("java/lang/IllegalStateException");
- if (exception_cl != nullptr) {
- env->ThrowNew(exception_cl, message.c_str());
- env->DeleteLocalRef(exception_cl);
- }
-}
-
-Result<int64_t> request_maxcompute_block_id_from_fe(int64_t txn_id,
- const std::string&
write_session_id) {
- if (txn_id <= 0) {
- return ResultError(Status::InvalidArgument(
- "invalid MaxCompute txn_id for block_id allocation: {}",
txn_id));
- }
- if (write_session_id.empty()) {
- return ResultError(Status::InvalidArgument(
- "empty MaxCompute write_session_id for block_id allocation"));
- }
-
- constexpr uint32_t FETCH_BLOCK_ID_MAX_RETRY_TIMES = 3;
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
- for (uint32_t retry_times = 0; retry_times <
FETCH_BLOCK_ID_MAX_RETRY_TIMES; retry_times++) {
- TMaxComputeBlockIdRequest request;
- TMaxComputeBlockIdResult result;
- request.__set_txn_id(txn_id);
- request.__set_write_session_id(write_session_id);
- request.__set_length(1);
-
- Status rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&request, &result](FrontendServiceConnection& client) {
- client->getMaxComputeBlockIdRange(result, request);
- });
-
- if (!rpc_status.ok()) {
- LOG(WARNING) << "Failed to allocate MaxCompute block_id, rpc
failure, retry_time="
- << retry_times << ", txn_id=" << txn_id
- << ", write_session_id=" << write_session_id << ",
status=" << rpc_status;
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- continue;
- }
-
- if (!result.__isset.status) {
- return ResultError(Status::RpcError(
- "failed to allocate MaxCompute block_id from FE, missing
status in response, "
- "txn_id={}, write_session_id={}",
- txn_id, write_session_id));
- }
-
- Status fe_status = Status::create<false>(result.status);
- if (fe_status.is<ErrorCode::NOT_MASTER>()) {
- if (!result.__isset.master_address) {
- return ResultError(Status::RpcError(
- "failed to allocate MaxCompute block_id from FE,
missing master address "
- "in NOT_MASTER response, txn_id={},
write_session_id={}",
- txn_id, write_session_id));
- }
- LOG(WARNING) << "Failed to allocate MaxCompute block_id, requested
non-master FE@"
- << master_addr.hostname << ":" << master_addr.port <<
", switch to FE@"
- << result.master_address.hostname << ":" <<
result.master_address.port
- << ", retry_time=" << retry_times << ", txn_id=" <<
txn_id
- << ", write_session_id=" << write_session_id;
- master_addr = result.master_address;
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- continue;
- }
-
- if (!fe_status.ok()) {
- LOG(WARNING) << "Failed to allocate MaxCompute block_id, FE
returned error, retry_time="
- << retry_times << ", txn_id=" << txn_id
- << ", write_session_id=" << write_session_id << ",
status=" << fe_status;
- return ResultError(std::move(fe_status));
- }
-
- if (result.length != 1) {
- return ResultError(Status::RpcError(
- "failed to allocate MaxCompute block_id from FE, expected
length=1 but got "
- "{}, txn_id={}, write_session_id={}",
- result.length, txn_id, write_session_id));
- }
-
- LOG(INFO) << "Allocated MaxCompute block_id from FE@" <<
master_addr.hostname << ":"
- << master_addr.port << ", txn_id=" << txn_id
- << ", write_session_id=" << write_session_id << ",
block_id=" << result.start;
- return result.start;
- }
-
- return ResultError(Status::RpcError(
- "failed to allocate MaxCompute block_id from FE, txn_id={},
write_session_id={}",
- txn_id, write_session_id));
-}
-
-} // namespace
-
jlong JavaNativeMethods::memoryMalloc(JNIEnv* env, jclass clazz, jlong bytes) {
return reinterpret_cast<long>(malloc(bytes));
}
@@ -209,28 +104,4 @@ void JavaNativeMethods::memoryFreeBatch(JNIEnv* env,
jclass clazz, jlongArray ad
env->ReleaseLongArrayElements(addrs, elems, JNI_ABORT);
}
-jlong JavaNativeMethods::requestMaxComputeBlockId(JNIEnv* env, jclass clazz,
jlong txn_id,
- jstring write_session_id) {
- if (write_session_id == nullptr) {
- throw_java_runtime_exception(
- env, "MaxCompute write_session_id is null when requesting
block_id");
- return 0;
- }
-
- const char* write_session_id_chars =
env->GetStringUTFChars(write_session_id, nullptr);
- if (write_session_id_chars == nullptr) {
- throw_java_runtime_exception(env, "Failed to read MaxCompute
write_session_id from Java");
- return 0;
- }
- std::string write_session_id_str(write_session_id_chars);
- env->ReleaseStringUTFChars(write_session_id, write_session_id_chars);
-
- auto block_id = request_maxcompute_block_id_from_fe(txn_id,
write_session_id_str);
- if (!block_id.has_value()) {
- throw_java_runtime_exception(env, block_id.error().to_string());
- return 0;
- }
- return static_cast<jlong>(block_id.value());
-}
-
} // namespace doris
diff --git a/be/src/util/jni_native_method.h b/be/src/util/jni_native_method.h
index 23429c3500a..48c74d91d67 100644
--- a/be/src/util/jni_native_method.h
+++ b/be/src/util/jni_native_method.h
@@ -42,10 +42,6 @@ struct JavaNativeMethods {
// Batch free multiple addresses; addrs is a long[]
static void memoryFreeBatch(JNIEnv* env, jclass clazz, jlongArray addrs);
-
- // Request a MaxCompute block id from FE via BE.
- static jlong requestMaxComputeBlockId(JNIEnv* env, jclass clazz, jlong
txn_id,
- jstring write_session_id);
};
} // namespace doris
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java
index 1104a5fa934..d48fe8e9347 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java
@@ -43,8 +43,4 @@ public class JNINativeMethod {
*/
public static native void memoryTrackerFreeBatch(long[] addrs);
- /**
- * Request a MaxCompute block id from BE, which will forward the request
to FE.
- */
- public static native long requestMaxComputeBlockId(long txnId, String
writeSessionId);
}
diff --git a/fe/be-java-extensions/max-compute-connector/pom.xml
b/fe/be-java-extensions/max-compute-connector/pom.xml
index 8f84c6b31aa..74c528fa2f1 100644
--- a/fe/be-java-extensions/max-compute-connector/pom.xml
+++ b/fe/be-java-extensions/max-compute-connector/pom.xml
@@ -41,6 +41,12 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>fe-thrift</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
diff --git
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
new file mode 100644
index 00000000000..82b58f48493
--- /dev/null
+++
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
@@ -0,0 +1,326 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TMaxComputeBlockIdRequest;
+import org.apache.doris.thrift.TMaxComputeBlockIdResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TFramedTransport;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * FE thrift client used by MaxCompute writer runtime code in BE's embedded
JVM.
+ */
+class MaxComputeFeClient implements AutoCloseable {
+ static final String FE_HOST = "fe_host";
+ static final String FE_PORT = "fe_port";
+ static final String FE_RPC_TIMEOUT_MS = "fe_rpc_timeout_ms";
+ static final String FE_THRIFT_SERVER_TYPE = "fe_thrift_server_type";
+
+ private static final Logger LOG =
Logger.getLogger(MaxComputeFeClient.class);
+ private static final int FETCH_BLOCK_ID_MAX_RETRY_TIMES = 3;
+ private static final long FETCH_BLOCK_ID_RETRY_SLEEP_MS = 10L;
+ private static final long FETCH_BLOCK_ID_LENGTH = 1L;
+ private static final int DEFAULT_FE_RPC_TIMEOUT_MS = 60000;
+ private static final String THREADED_SELECTOR = "THREADED_SELECTOR";
+ private static final String THREAD_POOL = "THREAD_POOL";
+
+ private final int rpcTimeoutMs;
+ private final String thriftServerType;
+ private final RpcExecutor rpcExecutor;
+ private final long retrySleepMs;
+ private TNetworkAddress masterAddress;
+
+ static MaxComputeFeClient create(Map<String, String> params) {
+ String host = requireParam(params, FE_HOST);
+ int port = Integer.parseInt(requireParam(params, FE_PORT));
+ int timeoutMs = Integer.parseInt(params.getOrDefault(FE_RPC_TIMEOUT_MS,
+ String.valueOf(DEFAULT_FE_RPC_TIMEOUT_MS)));
+ String serverType = params.getOrDefault(FE_THRIFT_SERVER_TYPE,
THREAD_POOL);
+ return new MaxComputeFeClient(new TNetworkAddress(host, port),
timeoutMs, serverType,
+ new ReusableRpcExecutor(),
+ FETCH_BLOCK_ID_RETRY_SLEEP_MS);
+ }
+
+ MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String
thriftServerType,
+ RpcExecutor rpcExecutor, long retrySleepMs) {
+ this.masterAddress = copyAddress(Objects.requireNonNull(masterAddress,
"masterAddress"));
+ this.rpcTimeoutMs = rpcTimeoutMs;
+ this.thriftServerType = thriftServerType == null ? THREAD_POOL :
thriftServerType;
+ this.rpcExecutor = Objects.requireNonNull(rpcExecutor, "rpcExecutor");
+ this.retrySleepMs = retrySleepMs;
+ }
+
+ long requestBlockId(long txnId, String writeSessionId) throws IOException {
+ if (txnId <= 0) {
+ throw new IOException("invalid MaxCompute txn_id for block_id
allocation: " + txnId);
+ }
+ if (writeSessionId == null || writeSessionId.isEmpty()) {
+ throw new IOException("empty MaxCompute write_session_id for
block_id allocation");
+ }
+
+ TMaxComputeBlockIdRequest request = buildBlockIdRequest(txnId,
writeSessionId);
+ return callWithMasterRedirect(
+ "allocate MaxCompute block_id",
+ client -> client.getMaxComputeBlockIdRange(request),
+ (result, requestAddress, retryTimes) ->
+ handleBlockIdResult(result, requestAddress,
retryTimes, txnId, writeSessionId));
+ }
+
+ @Override
+ public synchronized void close() {
+ rpcExecutor.close();
+ }
+
+ private synchronized <T, R> R callWithMasterRedirect(String operation,
FeCall<T> call,
+ ResponseHandler<T, R> handler)
+ throws IOException {
+ validateAddress(masterAddress);
+
+ Exception lastException = null;
+ for (int retryTimes = 0; retryTimes < FETCH_BLOCK_ID_MAX_RETRY_TIMES;
retryTimes++) {
+ TNetworkAddress requestAddress = copyAddress(masterAddress);
+ T result;
+ try {
+ result = rpcExecutor.call(requestAddress, rpcTimeoutMs,
useFramedTransport(), call);
+ } catch (Exception e) {
+ lastException = e;
+ rpcExecutor.close();
+ LOG.warn("Failed to " + operation + ", rpc failure,
retry_time="
+ + retryTimes + ", fe=" +
formatAddress(requestAddress), e);
+ sleepBeforeRetry();
+ continue;
+ }
+
+ try {
+ return handler.handle(result, requestAddress, retryTimes);
+ } catch (NotMasterException e) {
+ masterAddress = copyAddress(e.masterAddress);
+ lastException = e;
+ rpcExecutor.close();
+ sleepBeforeRetry();
+ }
+ }
+
+ throw new IOException("failed to " + operation + " from FE",
lastException);
+ }
+
+ private long handleBlockIdResult(TMaxComputeBlockIdResult result,
TNetworkAddress requestAddress, int retryTimes,
+ long txnId, String writeSessionId) throws IOException,
NotMasterException {
+ if (result == null || !result.isSetStatus()) {
+ throw new IOException("failed to allocate MaxCompute block_id from
FE, missing status in response, "
+ + "txn_id=" + txnId + ", write_session_id=" +
writeSessionId);
+ }
+
+ TStatus status = result.getStatus();
+ TStatusCode code = status.getStatusCode();
+ if (code == null) {
+ throw new IOException("failed to allocate MaxCompute block_id from
FE, missing status code, "
+ + "txn_id=" + txnId + ", write_session_id=" +
writeSessionId);
+ }
+ if (code == TStatusCode.NOT_MASTER) {
+ if (!result.isSetMasterAddress()) {
+ throw new IOException("failed to allocate MaxCompute block_id
from FE, missing master address "
+ + "in NOT_MASTER response, txn_id=" + txnId + ",
write_session_id=" + writeSessionId);
+ }
+ LOG.warn("Failed to allocate MaxCompute block_id, requested
non-master FE@"
+ + formatAddress(requestAddress) + ", switch to FE@" +
formatAddress(result.getMasterAddress())
+ + ", retry_time=" + retryTimes + ", txn_id=" + txnId
+ + ", write_session_id=" + writeSessionId);
+ throw new NotMasterException(result.getMasterAddress());
+ }
+
+ if (code != TStatusCode.OK) {
+ throw new IOException("failed to allocate MaxCompute block_id from
FE, status="
+ + statusErrorMessage(status) + ", txn_id=" + txnId
+ + ", write_session_id=" + writeSessionId);
+ }
+
+ if (!result.isSetStart()) {
+ throw new IOException("failed to allocate MaxCompute block_id from
FE, missing start in response, "
+ + "txn_id=" + txnId + ", write_session_id=" +
writeSessionId);
+ }
+ if (!result.isSetLength() || result.getLength() !=
FETCH_BLOCK_ID_LENGTH) {
+ throw new IOException("failed to allocate MaxCompute block_id from
FE, expected length=1 but got "
+ + result.getLength() + ", txn_id=" + txnId + ",
write_session_id=" + writeSessionId);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Allocated MaxCompute block_id from FE@" +
formatAddress(requestAddress)
+ + ", txn_id=" + txnId + ", write_session_id=" +
writeSessionId
+ + ", block_id=" + result.getStart());
+ }
+ return result.getStart();
+ }
+
+ private static TMaxComputeBlockIdRequest buildBlockIdRequest(long txnId,
String writeSessionId) {
+ TMaxComputeBlockIdRequest request = new TMaxComputeBlockIdRequest();
+ request.setTxnId(txnId);
+ request.setWriteSessionId(writeSessionId);
+ request.setLength(FETCH_BLOCK_ID_LENGTH);
+ return request;
+ }
+
+ private boolean useFramedTransport() {
+ return THREADED_SELECTOR.equalsIgnoreCase(thriftServerType);
+ }
+
+ private void sleepBeforeRetry() throws IOException {
+ try {
+ Thread.sleep(retrySleepMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupted while retrying MaxCompute
block_id allocation", e);
+ }
+ }
+
+ private static TTransport createTransport(TNetworkAddress address, int
timeoutMs,
+ boolean useFramedTransport) throws TTransportException {
+ TSocket socket = new TSocket(address.getHostname(), address.getPort(),
timeoutMs);
+ return useFramedTransport ? new TFramedTransport(socket) : socket;
+ }
+
+ private static void validateAddress(TNetworkAddress address) throws
IOException {
+ if (address.getHostname() == null || address.getHostname().isEmpty()
|| address.getPort() <= 0) {
+ throw new IOException("invalid FE address for MaxCompute block_id
allocation: "
+ + formatAddress(address));
+ }
+ }
+
+ private static String statusErrorMessage(TStatus status) {
+ List<String> errorMsgs = status.getErrorMsgs();
+ if (errorMsgs == null || errorMsgs.isEmpty()) {
+ return status.getStatusCode().name();
+ }
+ return status.getStatusCode().name() + ": " + String.join("; ",
errorMsgs);
+ }
+
+ private static String requireParam(Map<String, String> params, String key)
{
+ String value = params.get(key);
+ if (value == null || value.isEmpty()) {
+ throw new IllegalArgumentException("required property '" + key +
"'.");
+ }
+ return value;
+ }
+
+ private static TNetworkAddress copyAddress(TNetworkAddress address) {
+ return new TNetworkAddress(address.getHostname(), address.getPort());
+ }
+
+ private static boolean sameAddress(TNetworkAddress left, TNetworkAddress
right) {
+ return left != null && right != null
+ && Objects.equals(left.getHostname(), right.getHostname())
+ && left.getPort() == right.getPort();
+ }
+
+ private static String formatAddress(TNetworkAddress address) {
+ if (address == null) {
+ return "null";
+ }
+ return address.getHostname() + ":" + address.getPort();
+ }
+
+ interface RpcExecutor {
+ <T> T call(TNetworkAddress address, int timeoutMs, boolean
useFramedTransport,
+ FeCall<T> call) throws Exception;
+
+ default void close() {
+ }
+ }
+
+ interface FeCall<T> {
+ T call(FrontendService.Client client) throws Exception;
+ }
+
+ private interface ResponseHandler<T, R> {
+ R handle(T result, TNetworkAddress requestAddress, int retryTimes)
throws IOException, NotMasterException;
+ }
+
+ private static class ReusableRpcExecutor implements RpcExecutor {
+ private TNetworkAddress connectedAddress;
+ private boolean connectedFramedTransport;
+ private TTransport transport;
+ private FrontendService.Client client;
+
+ @Override
+ public synchronized <T> T call(TNetworkAddress address, int timeoutMs,
boolean useFramedTransport,
+ FeCall<T> call) throws Exception {
+ ensureConnected(address, timeoutMs, useFramedTransport);
+ try {
+ return call.call(client);
+ } catch (Exception e) {
+ close();
+ throw e;
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (transport != null) {
+ transport.close();
+ }
+ transport = null;
+ client = null;
+ connectedAddress = null;
+ }
+
+ private void ensureConnected(TNetworkAddress address, int timeoutMs,
boolean useFramedTransport)
+ throws Exception {
+ if (client != null && transport != null && transport.isOpen()
+ && connectedFramedTransport == useFramedTransport
+ && sameAddress(connectedAddress, address)) {
+ return;
+ }
+
+ close();
+ TTransport newTransport = createTransport(address, timeoutMs,
useFramedTransport);
+ try {
+ newTransport.open();
+ transport = newTransport;
+ client = new FrontendService.Client(new
TBinaryProtocol(transport));
+ connectedAddress = copyAddress(address);
+ connectedFramedTransport = useFramedTransport;
+ } catch (Exception e) {
+ newTransport.close();
+ throw e;
+ }
+ }
+ }
+
+ private static class NotMasterException extends Exception {
+ private final TNetworkAddress masterAddress;
+
+ NotMasterException(TNetworkAddress masterAddress) {
+ super("not master, master=" + formatAddress(masterAddress));
+ this.masterAddress = copyAddress(masterAddress);
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
index 38bfc7f17a7..9788184057e 100644
---
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
+++
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
@@ -18,7 +18,6 @@
package org.apache.doris.maxcompute;
import org.apache.doris.common.jni.JniWriter;
-import org.apache.doris.common.jni.utils.JNINativeMethod;
import org.apache.doris.common.jni.vec.VectorColumn;
import org.apache.doris.common.jni.vec.VectorTable;
import org.apache.doris.common.maxcompute.MCProperties;
@@ -115,6 +114,7 @@ public class MaxComputeJniWriter extends JniWriter {
private final int readTimeout;
private final int retryCount;
private final long maxBlockBytes;
+ private final MaxComputeFeClient feClient;
// Storage API objects
private TableBatchWriteSession writeSession;
@@ -155,6 +155,7 @@ public class MaxComputeJniWriter extends JniWriter {
this.maxBlockBytes = Long.parseLong(
params.getOrDefault(MCProperties.WRITE_MAX_BLOCK_BYTES,
MCProperties.DEFAULT_WRITE_MAX_BLOCK_BYTES));
+ this.feClient = MaxComputeFeClient.create(params);
}
@Override
@@ -241,12 +242,12 @@ public class MaxComputeJniWriter extends JniWriter {
}
}
- private long resolveInitialBlockId() {
+ private long resolveInitialBlockId() throws IOException {
return preallocatedBlockId != null ? preallocatedBlockId :
requestBlockId();
}
- private long requestBlockId() {
- return JNINativeMethod.requestMaxComputeBlockId(txnId, writeSessionId);
+ private long requestBlockId() throws IOException {
+ return feClient.requestBlockId(txnId, writeSessionId);
}
private void openBatchWriter(long blockId) throws IOException {
@@ -918,6 +919,8 @@ public class MaxComputeJniWriter extends JniWriter {
String errorMsg = "Failed to close MaxCompute arrow writer";
LOG.error(errorMsg, e);
throw new IOException(errorMsg, e);
+ } finally {
+ feClient.close();
}
}
diff --git
a/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java
b/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java
new file mode 100644
index 00000000000..923919de408
--- /dev/null
+++
b/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import org.apache.doris.thrift.TMaxComputeBlockIdRequest;
+import org.apache.doris.thrift.TMaxComputeBlockIdResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+
+public class MaxComputeFeClientTest {
+ @Test
+ public void testRequestBlockIdSuccess() throws Exception {
+ FakeExecutor executor = new FakeExecutor(okResult(42L));
+ MaxComputeFeClient client = new MaxComputeFeClient(
+ new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL",
executor, 0);
+
+ Assert.assertEquals(42L, client.requestBlockId(100L, "session-1"));
+ Assert.assertEquals(1, executor.addresses.size());
+ Assert.assertEquals("fe1", executor.addresses.get(0).getHostname());
+ Assert.assertEquals(9010, executor.addresses.get(0).getPort());
+ Assert.assertFalse(executor.framedTransports.get(0));
+ Assert.assertEquals(1234, (int) executor.timeouts.get(0));
+ Assert.assertEquals(100L, executor.requests.get(0).getTxnId());
+ Assert.assertEquals("session-1",
executor.requests.get(0).getWriteSessionId());
+ Assert.assertEquals(1L, executor.requests.get(0).getLength());
+ }
+
+ @Test
+ public void testRequestBlockIdRedirectsToMaster() throws Exception {
+ FakeExecutor executor = new FakeExecutor(notMasterResult("master",
9020), okResult(7L));
+ MaxComputeFeClient client = new MaxComputeFeClient(
+ new TNetworkAddress("follower", 9010), 1234,
"THREADED_SELECTOR", executor, 0);
+
+ Assert.assertEquals(7L, client.requestBlockId(101L, "session-2"));
+ Assert.assertEquals(2, executor.addresses.size());
+ Assert.assertEquals("follower",
executor.addresses.get(0).getHostname());
+ Assert.assertEquals("master", executor.addresses.get(1).getHostname());
+ Assert.assertTrue(executor.framedTransports.get(0));
+ Assert.assertTrue(executor.framedTransports.get(1));
+ }
+
+ @Test
+ public void testFeErrorFailsWithoutRetry() {
+ FakeExecutor executor = new FakeExecutor(errorResult("allocation
failed"));
+ MaxComputeFeClient client = new MaxComputeFeClient(
+ new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL",
executor, 0);
+
+ expectIOExceptionContains(() -> client.requestBlockId(102L,
"session-3"), "allocation failed");
+ Assert.assertEquals(1, executor.addresses.size());
+ }
+
+ @Test
+ public void testRpcFailureRetries() throws Exception {
+ FakeExecutor executor = new FakeExecutor(
+ new IOException("connect failed"),
+ new IOException("temporary failure"),
+ okResult(9L));
+ MaxComputeFeClient client = new MaxComputeFeClient(
+ new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL",
executor, 0);
+
+ Assert.assertEquals(9L, client.requestBlockId(103L, "session-4"));
+ Assert.assertEquals(3, executor.addresses.size());
+ }
+
+ private static TMaxComputeBlockIdResult okResult(long start) {
+ TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult();
+ result.setStatus(new TStatus(TStatusCode.OK));
+ result.setStart(start);
+ result.setLength(1L);
+ return result;
+ }
+
+ private static TMaxComputeBlockIdResult notMasterResult(String host, int
port) {
+ TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult();
+ result.setStatus(new TStatus(TStatusCode.NOT_MASTER));
+ result.setMasterAddress(new TNetworkAddress(host, port));
+ return result;
+ }
+
+ private static TMaxComputeBlockIdResult errorResult(String errorMsg) {
+ TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(errorMsg);
+ TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult();
+ result.setStatus(status);
+ return result;
+ }
+
+ private static void expectIOExceptionContains(IOAction action, String
expectedMessage) {
+ try {
+ action.run();
+ Assert.fail("expected IOException");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage(),
e.getMessage().contains(expectedMessage));
+ }
+ }
+
+ private interface IOAction {
+ void run() throws IOException;
+ }
+
+ private static class FakeExecutor implements
MaxComputeFeClient.RpcExecutor {
+ private final Queue<Object> responses;
+ private final List<TNetworkAddress> addresses = new ArrayList<>();
+ private final List<Integer> timeouts = new ArrayList<>();
+ private final List<Boolean> framedTransports = new ArrayList<>();
+ private final List<TMaxComputeBlockIdRequest> requests = new
ArrayList<>();
+
+ FakeExecutor(Object... responses) {
+ this.responses = new ArrayDeque<>(Arrays.asList(responses));
+ }
+
+ @Override
+ public <T> T call(TNetworkAddress address, int timeoutMs, boolean
useFramedTransport,
+ MaxComputeFeClient.FeCall<T> call) throws Exception {
+ addresses.add(new TNetworkAddress(address.getHostname(),
address.getPort()));
+ timeouts.add(timeoutMs);
+ framedTransports.add(useFramedTransport);
+
+ FrontendServiceClient client = new FrontendServiceClient();
+ return call.call(client);
+ }
+
+ private class FrontendServiceClient extends
org.apache.doris.thrift.FrontendService.Client {
+ FrontendServiceClient() {
+ super((TProtocol) null);
+ }
+
+ @Override
+ public TMaxComputeBlockIdResult
getMaxComputeBlockIdRange(TMaxComputeBlockIdRequest request)
+ throws org.apache.thrift.TException {
+ requests.add(request);
+
+ Object response = responses.remove();
+ if (response instanceof RuntimeException) {
+ throw (RuntimeException) response;
+ }
+ if (response instanceof IOException) {
+ throw new RuntimeException((IOException) response);
+ }
+ if (response instanceof org.apache.thrift.TException) {
+ throw (org.apache.thrift.TException) response;
+ }
+ if (response instanceof Exception) {
+ throw new RuntimeException((Exception) response);
+ }
+ return (TMaxComputeBlockIdResult) response;
+ }
+ }
+ }
+}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2994f9cc319..bb6ee8bb8c9 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2220,6 +2220,13 @@ public class Config extends ConfigBase {
@ConfField(description = {"Maximum cached file number for external table
split file meta cache at query level."})
public static long max_external_table_split_file_meta_cache_num = 100000;
+ /**
+ * Maximum number of MaxCompute Storage API write block IDs that can be
allocated in one write session.
+ */
+ @ConfField(mutable = false, masterOnly = true, description = {
+ "Maximum number of MaxCompute Storage API write block IDs that can
be allocated in one write session."})
+ public static long max_compute_write_max_block_count = 20000L;
+
/**
* Max cache loader thread-pool size.
* Max thread pool size for loading external meta cache
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
index 76a3c84ebb7..9f1c61ddf24 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.maxcompute;
+import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
@@ -49,7 +50,6 @@ import java.util.stream.Collectors;
public class MCTransaction implements Transaction {
private static final Logger LOG =
LogManager.getLogger(MCTransaction.class);
- private static final long MAX_BLOCK_COUNT = 20000L;
private final MaxComputeExternalCatalog catalog;
private MaxComputeExternalTable table;
@@ -147,9 +147,10 @@ public class MCTransaction implements Transaction {
do {
start = nextBlockId.get();
endExclusive = start + length;
- if (endExclusive > MAX_BLOCK_COUNT) {
+ if (endExclusive > Config.max_compute_write_max_block_count) {
throw new UserException("MaxCompute block_id exceeds limit,
start="
- + start + ", length=" + length + ", maxBlockCount=" +
MAX_BLOCK_COUNT);
+ + start + ", length=" + length + ", maxBlockCount="
+ + Config.max_compute_write_max_block_count);
}
} while (!nextBlockId.compareAndSet(start, endExclusive));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]