This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3d81881fd8b [IOTDB-6261] Pipe: support leader cache in
iotdb-thrift-async-connector (#11807)
3d81881fd8b is described below
commit 3d81881fd8b57a76f32ad8954873b9f657193ff9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Dec 29 15:25:44 2023 +0800
[IOTDB-6261] Pipe: support leader cache in iotdb-thrift-async-connector
(#11807)
---
.../protocol/thrift/IoTDBThriftClientManager.java | 39 ++++
.../async/IoTDBThriftAsyncClientManager.java | 200 ++++++++++++++++++++
.../thrift/async/IoTDBThriftAsyncConnector.java | 210 +++++----------------
.../PipeTransferTabletInsertNodeEventHandler.java | 12 ++
.../PipeTransferTabletInsertionEventHandler.java | 17 +-
.../handler/PipeTransferTabletRawEventHandler.java | 7 +
.../thrift/sync/IoTDBThriftSyncClientManager.java | 30 ++-
.../thrift/sync/IoTDBThriftSyncConnector.java | 2 +-
.../async/AsyncPipeDataTransferServiceClient.java | 8 +
9 files changed, 341 insertions(+), 184 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftClientManager.java
new file mode 100644
index 00000000000..9a631020a44
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftClientManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.thrift;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+import java.util.List;
+
+public abstract class IoTDBThriftClientManager {
+
+ protected final List<TEndPoint> endPointList;
+
+ protected long currentClientIndex = 0;
+
+ protected final boolean useLeaderCache;
+ protected static final LeaderCacheManager leaderCacheManager = new
LeaderCacheManager();
+
+ protected IoTDBThriftClientManager(List<TEndPoint> endPointList, boolean
useLeaderCache) {
+ this.endPointList = endPointList;
+ this.useLeaderCache = useLeaderCache;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
new file mode 100644
index 00000000000..477165d2bd8
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.thrift.async;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class IoTDBThriftAsyncClientManager extends IoTDBThriftClientManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftAsyncClientManager.class);
+
+ private final Set<TEndPoint> endPointSet;
+
+ private static final AtomicReference<
+ IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
+ private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
endPoint2Client;
+
+ public IoTDBThriftAsyncClientManager(List<TEndPoint> endPoints, boolean
useLeaderCache) {
+ super(endPoints, useLeaderCache);
+
+ endPointSet = new HashSet<>(endPoints);
+
+ if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
+ synchronized (IoTDBThriftAsyncConnector.class) {
+ if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
+ new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
+ }
+ }
+ }
+ endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
+ }
+
+ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
+ final int clientSize = endPointList.size();
+ while (true) {
+ final TEndPoint targetNodeUrl = endPointList.get((int)
(currentClientIndex++ % clientSize));
+ final AsyncPipeDataTransferServiceClient client =
endPoint2Client.borrowClient(targetNodeUrl);
+ if (handshakeIfNecessary(targetNodeUrl, client)) {
+ return client;
+ }
+ }
+ }
+
+ public AsyncPipeDataTransferServiceClient borrowClient(String deviceId)
throws Exception {
+ if (!useLeaderCache) {
+ return borrowClient();
+ }
+
+ final TEndPoint endPoint = leaderCacheManager.getLeaderEndPoint(deviceId);
+ if (endPoint == null) {
+ return borrowClient();
+ }
+
+ try {
+ final AsyncPipeDataTransferServiceClient client =
endPoint2Client.borrowClient(endPoint);
+ if (handshakeIfNecessary(endPoint, client)) {
+ return client;
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "failed to borrow client {}:{} for cached leader.",
+ endPoint.getIp(),
+ endPoint.getPort(),
+ e);
+ }
+
+ return borrowClient();
+ }
+
+ /**
+ * Handshake with the target if necessary.
+ *
+ * @param client client to handshake
+ * @return true if the handshake is already finished, false if the handshake
is not finished yet
+ * and finished in this method
+ * @throws Exception if an error occurs.
+ */
+ private boolean handshakeIfNecessary(
+ TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client)
throws Exception {
+ if (client.isHandshakeFinished()) {
+ return true;
+ }
+
+ final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ client.pipeTransfer(
+ PipeTransferHandshakeReq.toTPipeTransferReq(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+ new AsyncMethodCallback<TPipeTransferResp>() {
+ @Override
+ public void onComplete(TPipeTransferResp response) {
+ if (response.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Handshake error with receiver {}:{}, code: {}, message:
{}.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ response.getStatus().getCode(),
+ response.getStatus().getMessage());
+ exception.set(
+ new PipeConnectionException(
+ String.format(
+ "Handshake error with receiver %s:%s, code: %d,
message: %s.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ response.getStatus().getCode(),
+ response.getStatus().getMessage())));
+ } else {
+ LOGGER.info(
+ "Handshake successfully with receiver {}:{}.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort());
+ client.markHandshakeFinished();
+ }
+
+ isHandshakeFinished.set(true);
+ }
+
+ @Override
+ public void onError(Exception e) {
+ LOGGER.warn(
+ "Handshake error with receiver {}:{}.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ e);
+ exception.set(e);
+
+ isHandshakeFinished.set(true);
+ }
+ });
+
+ try {
+ while (!isHandshakeFinished.get()) {
+ Thread.sleep(10);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PipeException("Interrupted while waiting for handshake
response.", e);
+ }
+
+ if (exception.get() != null) {
+ throw new PipeConnectionException("Failed to handshake.",
exception.get());
+ }
+
+ return false;
+ }
+
+ public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
+ if (!useLeaderCache) {
+ return;
+ }
+
+ if (!endPointSet.contains(endPoint)) {
+ endPointList.add(endPoint);
+ endPointSet.add(endPoint);
+ }
+
+ leaderCacheManager.updateLeaderEndPoint(deviceId, endPoint);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index e608b39609c..e8460af85bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -20,13 +20,9 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientPoolFactory;
-import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.IoTDBConnector;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -47,41 +43,37 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
public class IoTDBThriftAsyncConnector extends IoTDBConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftAsyncConnector.class);
- private static final String THRIFT_ERROR_FORMATTER =
+ private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
+ "Failed to borrow client from client pool or exception occurred "
+ + "when sending to receiver.";
+ private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
"Failed to borrow client from client pool or exception occurred "
+ "when sending to receiver %s:%s.";
- private static final AtomicReference<
- IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
- ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
- private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
- asyncPipeDataTransferClientManager;
+ private IoTDBThriftAsyncClientManager clientManager;
private final IoTDBThriftSyncConnector retryConnector = new
IoTDBThriftSyncConnector();
private final PriorityBlockingQueue<Event> retryEventQueue =
@@ -95,20 +87,6 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
- public IoTDBThriftAsyncConnector() {
- if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
- synchronized (IoTDBThriftAsyncConnector.class) {
- if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
- ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
- new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
- .createClientManager(
- new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
- }
- }
- }
- asyncPipeDataTransferClientManager =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
- }
-
@Override
public void validate(PipeParameterValidator validator) throws Exception {
super.validate(validator);
@@ -132,6 +110,13 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
retryParameters.getAttribute().put(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
"false");
retryConnector.customize(retryParameters, configuration);
+ clientManager =
+ new IoTDBThriftAsyncClientManager(
+ nodeUrls,
+ parameters.getBooleanOrDefault(
+ Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
+ CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE));
+
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
}
@@ -173,14 +158,12 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
return;
}
- final long commitId = ((EnrichedEvent) tabletInsertionEvent).getCommitId();
-
if (isTabletBatchModeEnabled) {
if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
final PipeTransferTabletBatchEventHandler
pipeTransferTabletBatchEventHandler =
new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
- transfer(commitId, pipeTransferTabletBatchEventHandler);
+ transfer(pipeTransferTabletBatchEventHandler);
tabletBatchBuilder.onSuccess();
}
@@ -198,7 +181,7 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
new PipeTransferTabletInsertNodeEventHandler(
pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
- transfer(commitId, pipeTransferInsertNodeReqHandler);
+ transfer(pipeTransferInsertNodeReqHandler);
} else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
@@ -210,54 +193,40 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
new PipeTransferTabletRawEventHandler(
pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
- transfer(commitId, pipeTransferTabletReqHandler);
+ transfer(pipeTransferTabletReqHandler);
}
}
}
- private void transfer(
- long requestCommitId,
- PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler)
{
- final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
-
+ private void transfer(PipeTransferTabletBatchEventHandler
pipeTransferTabletBatchEventHandler) {
+ AsyncPipeDataTransferServiceClient client = null;
try {
- final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
+ client = clientManager.borrowClient();
pipeTransferTabletBatchEventHandler.transfer(client);
} catch (Exception ex) {
- LOGGER.warn(
- String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
- ex);
+ logOnClientException(client, ex);
pipeTransferTabletBatchEventHandler.onError(ex);
}
}
- private void transfer(
- long requestCommitId,
- PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler) {
- final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
-
+ private void transfer(PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler) {
+ AsyncPipeDataTransferServiceClient client = null;
try {
- final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
+ client = clientManager.borrowClient();
pipeTransferInsertNodeReqHandler.transfer(client);
} catch (Exception ex) {
- LOGGER.warn(
- String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
- ex);
+ logOnClientException(client, ex);
pipeTransferInsertNodeReqHandler.onError(ex);
}
}
- private void transfer(
- long requestCommitId, PipeTransferTabletRawEventHandler
pipeTransferTabletReqHandler) {
- final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
-
+ private void transfer(PipeTransferTabletRawEventHandler
pipeTransferTabletReqHandler) {
+ AsyncPipeDataTransferServiceClient client = null;
try {
- final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
+ client = clientManager.borrowClient();
pipeTransferTabletReqHandler.transfer(client);
} catch (Exception ex) {
- LOGGER.warn(
- String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
- ex);
+ logOnClientException(client, ex);
pipeTransferTabletReqHandler.onError(ex);
}
}
@@ -303,21 +272,17 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
final PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler =
new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent,
this);
- transfer(pipeTsFileInsertionEvent.getCommitId(),
pipeTransferTsFileInsertionEventHandler);
+ transfer(pipeTransferTsFileInsertionEventHandler);
}
private void transfer(
- long requestCommitId,
PipeTransferTsFileInsertionEventHandler
pipeTransferTsFileInsertionEventHandler) {
- final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
-
+ AsyncPipeDataTransferServiceClient client = null;
try {
- final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
+ client = clientManager.borrowClient();
pipeTransferTsFileInsertionEventHandler.transfer(client);
} catch (Exception ex) {
- LOGGER.warn(
- String.format(THRIFT_ERROR_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
- ex);
+ logOnClientException(client, ex);
pipeTransferTsFileInsertionEventHandler.onError(ex);
}
}
@@ -333,93 +298,21 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
}
}
- private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint
targetNodeUrl)
- throws Exception {
- while (true) {
- final AsyncPipeDataTransferServiceClient client =
- asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
- if (handshakeIfNecessary(targetNodeUrl, client)) {
- return client;
- }
- }
- }
-
- /**
- * Handshake with the target if necessary.
- *
- * @param client client to handshake
- * @return true if the handshake is already finished, false if the handshake
is not finished yet
- * and finished in this method
- * @throws Exception if an error occurs.
- */
- private boolean handshakeIfNecessary(
- TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client)
throws Exception {
- if (client.isHandshakeFinished()) {
- return true;
- }
+ //////////////////////////// Leader cache update ////////////////////////////
- final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
- final AtomicReference<Exception> exception = new AtomicReference<>();
-
- client.pipeTransfer(
- PipeTransferHandshakeReq.toTPipeTransferReq(
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
- new AsyncMethodCallback<TPipeTransferResp>() {
- @Override
- public void onComplete(TPipeTransferResp response) {
- if (response.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
- "Handshake error with receiver {}:{}, code: {}, message:
{}.",
- targetNodeUrl.getIp(),
- targetNodeUrl.getPort(),
- response.getStatus().getCode(),
- response.getStatus().getMessage());
- exception.set(
- new PipeConnectionException(
- String.format(
- "Handshake error with receiver %s:%s, code: %d,
message: %s.",
- targetNodeUrl.getIp(),
- targetNodeUrl.getPort(),
- response.getStatus().getCode(),
- response.getStatus().getMessage())));
- } else {
- LOGGER.info(
- "Handshake successfully with receiver {}:{}.",
- targetNodeUrl.getIp(),
- targetNodeUrl.getPort());
- client.markHandshakeFinished();
- }
-
- isHandshakeFinished.set(true);
- }
-
- @Override
- public void onError(Exception e) {
- LOGGER.warn(
- "Handshake error with receiver {}:{}.",
- targetNodeUrl.getIp(),
- targetNodeUrl.getPort(),
- e);
- exception.set(e);
-
- isHandshakeFinished.set(true);
- }
- });
+ public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
+ clientManager.updateLeaderCache(deviceId, endPoint);
+ }
- try {
- while (!isHandshakeFinished.get()) {
- Thread.sleep(10);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PipeException("Interrupted while waiting for handshake
response.", e);
- }
+ //////////////////////////// Exception handlers ////////////////////////////
- if (exception.get() != null) {
- throw new PipeConnectionException("Failed to handshake.",
exception.get());
+ private void logOnClientException(AsyncPipeDataTransferServiceClient client,
Exception e) {
+ if (client == null) {
+ LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e);
+ } else {
+ LOGGER.warn(
+ String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, client.getIp(),
client.getPort()), e);
}
-
- return false;
}
/**
@@ -462,14 +355,7 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
return;
}
- // requestCommitId can not be generated by commitIdGenerator because the
commit id must
- // be bind to a specific InsertTabletEvent or TsFileInsertionEvent,
otherwise the commit
- // process will stuck.
- final long requestCommitId = tabletBatchBuilder.getLastCommitId();
- final PipeTransferTabletBatchEventHandler
pipeTransferTabletBatchEventHandler =
- new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
-
- transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
+ transfer(new PipeTransferTabletBatchEventHandler(tabletBatchBuilder,
this));
tabletBatchBuilder.onSuccess();
}
@@ -483,6 +369,8 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
retryEventQueue.offer(event);
}
+ //////////////////////////// Operations for close
////////////////////////////
+
/**
* When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
* its queued events in the output pipe connector.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index c0dcf8670f4..d30b4115218 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -42,4 +44,14 @@ public class PipeTransferTabletInsertNodeEventHandler
throws TException {
client.pipeTransfer(req, this);
}
+
+ @Override
+ protected void updateLeaderCache(TSStatus status) {
+ final InsertNode insertNode =
+ ((PipeInsertNodeTabletInsertionEvent)
event).getInsertNodeViaCacheIfPossible();
+ if (insertNode != null) {
+ connector.updateLeaderCache(
+ insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index d1b0ae18c50..6a7f79a3162 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
@@ -39,10 +40,10 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
- private final TabletInsertionEvent event;
- private final TPipeTransferReq req;
+ protected final TabletInsertionEvent event;
+ protected final TPipeTransferReq req;
- private final IoTDBThriftAsyncConnector connector;
+ protected final IoTDBThriftAsyncConnector connector;
protected PipeTransferTabletInsertionEventHandler(
TabletInsertionEvent event, TPipeTransferReq req,
IoTDBThriftAsyncConnector connector) {
@@ -71,16 +72,22 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
return;
}
- if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ final TSStatus status = response.getStatus();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event)
.decreaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName(),
true);
}
+ if (status.isSetRedirectNode()) {
+ updateLeaderCache(status);
+ }
} else {
- onError(new PipeException(response.getStatus().getMessage()));
+ onError(new PipeException(status.getMessage()));
}
}
+ protected abstract void updateLeaderCache(TSStatus status);
+
@Override
public void onError(Exception exception) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index 4cf8a768048..78d5983e768 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -42,4 +43,10 @@ public class PipeTransferTabletRawEventHandler
throws TException {
client.pipeTransfer(req, this);
}
+
+ @Override
+ protected void updateLeaderCache(TSStatus status) {
+ connector.updateLeaderCache(
+ ((PipeRawTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index 090045624e7..4425439c07f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.client.IoTDBThriftSyncConnectorClient;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.LeaderCacheManager;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -42,7 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class IoTDBThriftSyncClientManager implements Closeable {
+public class IoTDBThriftSyncClientManager extends IoTDBThriftClientManager
implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftSyncClientManager.class);
@@ -52,30 +52,22 @@ public class IoTDBThriftSyncClientManager implements
Closeable {
private final String trustStorePath;
private final String trustStorePwd;
- private final boolean useLeaderCache;
-
- private final List<TEndPoint> endPoints;
private final Map<TEndPoint, Pair<IoTDBThriftSyncConnectorClient, Boolean>>
endPoint2ClientAndStatus = new ConcurrentHashMap<>();
- private final LeaderCacheManager leaderCacheManager = new
LeaderCacheManager();
-
- private long currentClientIndex = 0;
-
public IoTDBThriftSyncClientManager(
List<TEndPoint> endPoints,
boolean useSSL,
String trustStorePath,
String trustStorePwd,
boolean useLeaderCache) {
+ super(endPoints, useLeaderCache);
+
this.useSSL = useSSL;
this.trustStorePath = trustStorePath;
this.trustStorePwd = trustStorePwd;
- this.useLeaderCache = useLeaderCache;
-
- this.endPoints = endPoints;
- for (TEndPoint endPoint : endPoints) {
+ for (final TEndPoint endPoint : endPoints) {
endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false));
}
}
@@ -167,12 +159,12 @@ public class IoTDBThriftSyncClientManager implements
Closeable {
}
public Pair<IoTDBThriftSyncConnectorClient, Boolean> getClient() {
- final int clientSize = endPoints.size();
+ final int clientSize = endPointList.size();
// Round-robin, find the next alive client
for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
final int clientIndex = (int) (currentClientIndex++ % clientSize);
final Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus =
- endPoint2ClientAndStatus.get(endPoints.get(clientIndex));
+ endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
return clientAndStatus;
}
@@ -198,7 +190,7 @@ public class IoTDBThriftSyncClientManager implements
Closeable {
try {
if (!endPoint2ClientAndStatus.containsKey(endPoint)) {
- endPoints.add(endPoint);
+ endPointList.add(endPoint);
endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false));
reconstructClient(endPoint);
}
@@ -233,7 +225,11 @@ public class IoTDBThriftSyncClientManager implements
Closeable {
LOGGER.info("Client {}:{} closed.", endPoint.getIp(),
endPoint.getPort());
} catch (Exception e) {
LOGGER.warn(
- "Failed to close client {}:{}, because: {}.", endPoint.getIp(),
endPoint.getPort(), e);
+ "Failed to close client {}:{}, because: {}.",
+ endPoint.getIp(),
+ endPoint.getPort(),
+ e.getMessage(),
+ e);
} finally {
clientAndStatus.setRight(false);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index c87d00ce7f0..0d4ef07d539 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -283,7 +283,7 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
}
}
- private void doTransfer() throws IOException {
+ private void doTransfer() {
Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus =
clientManager.getClient();
final TPipeTransferResp resp;
try {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 83f47956812..2b8110bc7bc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -146,6 +146,14 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
LOGGER.info("Handshake finished for client {}", this);
}
+ public String getIp() {
+ return endpoint.getIp();
+ }
+
+ public int getPort() {
+ return endpoint.getPort();
+ }
+
@Override
public String toString() {
return String.format("AsyncPipeDataTransferServiceClient{%s}, id = {%d}",
endpoint, id);