This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e887ea00523 Pipe: Fixed NPE in schema pattern parsing & Improved
connector subtask retry logic and logs & Fixed the executor used by consensus
pipe (#12704)
e887ea00523 is described below
commit e887ea005238165e67556523afb1fab5ee951132
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 11 21:09:48 2024 +0800
Pipe: Fixed NPE in schema pattern parsing & Improved connector subtask
retry logic and logs & Fixed the executor used by consensus pipe (#12704)
---
.../execution/PipeConfigNodeSubtaskExecutor.java | 6 +--
.../consensus/PipeConsensusSubtaskExecutor.java | 4 +-
.../execution/PipeConnectorSubtaskExecutor.java | 7 ++--
.../execution/PipeProcessorSubtaskExecutor.java | 3 +-
.../pipe/task/builder/PipeDataNodeTaskBuilder.java | 4 +-
.../threadpool/WrappedThreadPoolExecutor.java | 7 +++-
.../execution/executor/PipeSubtaskExecutor.java | 30 +++++++++------
.../commons/pipe/pattern/IoTDBPipePattern.java | 4 +-
.../iotdb/commons/pipe/task/DecoratingLock.java | 44 ----------------------
.../task/subtask/PipeAbstractConnectorSubtask.java | 42 ++++++---------------
.../pipe/task/subtask/PipeReportableSubtask.java | 28 ++++++++------
11 files changed, 68 insertions(+), 111 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
index 9a82e72ece3..6041c5bf82d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtaskExecutor.java
@@ -28,15 +28,15 @@ public class PipeConfigNodeSubtaskExecutor extends
PipeSubtaskExecutor {
private static final int THREAD_NUM = 1;
private PipeConfigNodeSubtaskExecutor() {
- super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL);
+ super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL, true);
}
/**
* @param ignored Used to distinguish this constructor from the default
constructor.
*/
@TestOnly
- public PipeConfigNodeSubtaskExecutor(Object ignored) {
- super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL);
+ public PipeConfigNodeSubtaskExecutor(final Object ignored) {
+ super(THREAD_NUM, ThreadName.PIPE_CONFIGNODE_EXECUTOR_POOL, true);
}
private static class PipeSchemaSubtaskExecutorHolder {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusSubtaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusSubtaskExecutor.java
index 5d99c5038fc..175a95992cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusSubtaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusSubtaskExecutor.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.db.pipe.consensus;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor;
+import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
-public class PipeConsensusSubtaskExecutor extends PipeSubtaskExecutor {
+public class PipeConsensusSubtaskExecutor extends PipeConnectorSubtaskExecutor
{
public PipeConsensusSubtaskExecutor() {
super(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
index ce832091c1d..ea03c8ce4a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutor.java
@@ -28,10 +28,11 @@ public class PipeConnectorSubtaskExecutor extends
PipeSubtaskExecutor {
public PipeConnectorSubtaskExecutor() {
super(
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
- ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL);
+ ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL,
+ true);
}
- public PipeConnectorSubtaskExecutor(int corePoolSize, ThreadName threadName)
{
- super(corePoolSize, threadName);
+ public PipeConnectorSubtaskExecutor(final int corePoolSize, final ThreadName
threadName) {
+ super(corePoolSize, threadName, true);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
index a4b36d3664c..6d5a8ca89fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutor.java
@@ -28,6 +28,7 @@ public class PipeProcessorSubtaskExecutor extends
PipeSubtaskExecutor {
public PipeProcessorSubtaskExecutor() {
super(
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
- ThreadName.PIPE_PROCESSOR_EXECUTOR_POOL);
+ ThreadName.PIPE_PROCESSOR_EXECUTOR_POOL,
+ false);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index e84f93e34ce..04d5ece6e3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -54,7 +54,7 @@ public class PipeDataNodeTaskBuilder {
CONNECTOR_EXECUTOR_MAP.put(
PipeType.SUBSCRIPTION,
PipeSubtaskExecutorManager.getInstance().getSubscriptionExecutor());
CONNECTOR_EXECUTOR_MAP.put(
- PipeType.CONSENSUS,
PipeSubtaskExecutorManager.getInstance().getConnectorExecutor());
+ PipeType.CONSENSUS,
PipeSubtaskExecutorManager.getInstance().getConsensusExecutor());
}
protected final Map<String, String> systemParameters = new HashMap<>();
@@ -89,7 +89,7 @@ public class PipeDataNodeTaskBuilder {
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
regionId,
CONNECTOR_EXECUTOR_MAP.get(pipeType));
- } else { // user pipe
+ } else { // user pipe or consensus pipe
connectorStage =
new PipeTaskConnectorStage(
pipeStaticMeta.getPipeName(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
index f35eba3e7bb..2c24f045ca5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -41,6 +41,7 @@ public class WrappedThreadPoolExecutor extends
ThreadPoolExecutor
implements WrappedThreadPoolExecutorMBean {
private static final Logger logger =
LoggerFactory.getLogger(WrappedThreadPoolExecutor.class);
private final String mbeanName;
+ private volatile boolean disableErrorLog = false;
public WrappedThreadPoolExecutor(
int corePoolSize,
@@ -125,8 +126,12 @@ public class WrappedThreadPoolExecutor extends
ThreadPoolExecutor
Thread.currentThread().interrupt();
}
}
- if (t != null) {
+ if (t != null && !disableErrorLog) {
logger.error("Exception in thread pool {}", mbeanName, t);
}
}
+
+ public void disableErrorLog() {
+ disableErrorLog = true;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.java
index e990318660b..88653805067 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.execution.executor;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -41,17 +42,22 @@ public abstract class PipeSubtaskExecutor {
private static final ExecutorService subtaskCallbackListeningExecutor =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());
- private final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
+ protected final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
private final Map<String, PipeSubtask> registeredIdSubtaskMapper;
private final int corePoolSize;
private int runningSubtaskNumber;
- protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) {
- subtaskWorkerThreadPoolExecutor =
- MoreExecutors.listeningDecorator(
- IoTDBThreadPoolFactory.newFixedThreadPool(corePoolSize,
threadName.getName()));
+ protected PipeSubtaskExecutor(
+ final int corePoolSize, final ThreadName threadName, final boolean
disableLogInThreadPool) {
+ final WrappedThreadPoolExecutor executor =
+ (WrappedThreadPoolExecutor)
+ IoTDBThreadPoolFactory.newFixedThreadPool(corePoolSize,
threadName.getName());
+ if (disableLogInThreadPool) {
+ executor.disableErrorLog();
+ }
+ subtaskWorkerThreadPoolExecutor =
MoreExecutors.listeningDecorator(executor);
registeredIdSubtaskMapper = new ConcurrentHashMap<>();
@@ -61,7 +67,7 @@ public abstract class PipeSubtaskExecutor {
/////////////////////// Subtask management ///////////////////////
- public final synchronized void register(PipeSubtask subtask) {
+ public final synchronized void register(final PipeSubtask subtask) {
if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
LOGGER.warn("The subtask {} is already registered.",
subtask.getTaskID());
return;
@@ -74,7 +80,7 @@ public abstract class PipeSubtaskExecutor {
new PipeSubtaskScheduler(this));
}
- public final synchronized void start(String subTaskID) {
+ public final synchronized void start(final String subTaskID) {
if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
LOGGER.warn("The subtask {} is not registered.", subTaskID);
return;
@@ -93,7 +99,7 @@ public abstract class PipeSubtaskExecutor {
}
}
- public final synchronized void stop(String subTaskID) {
+ public final synchronized void stop(final String subTaskID) {
if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
LOGGER.warn("The subtask {} is not registered.", subTaskID);
return;
@@ -104,7 +110,7 @@ public abstract class PipeSubtaskExecutor {
}
}
- public final synchronized void deregister(String subTaskID) {
+ public final synchronized void deregister(final String subTaskID) {
stop(subTaskID);
final PipeSubtask subtask = registeredIdSubtaskMapper.remove(subTaskID);
@@ -113,14 +119,14 @@ public abstract class PipeSubtaskExecutor {
try {
subtask.close();
LOGGER.info("The subtask {} is closed successfully.", subTaskID);
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
}
}
}
@TestOnly
- public final boolean isRegistered(String subTaskID) {
+ public final boolean isRegistered(final String subTaskID) {
return registeredIdSubtaskMapper.containsKey(subTaskID);
}
@@ -137,7 +143,7 @@ public abstract class PipeSubtaskExecutor {
}
// stop all subtasks before shutting down the executor
- for (PipeSubtask subtask : registeredIdSubtaskMapper.values()) {
+ for (final PipeSubtask subtask : registeredIdSubtaskMapper.values()) {
subtask.disallowSubmittingSelf();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
index 6c82c7e3d9c..30307d07711 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
@@ -50,7 +50,9 @@ public class IoTDBPipePattern extends PipePattern {
public static <T> List<T> applyIndexesOnList(
final int[] filteredIndexes, final List<T> originalList) {
- return
Arrays.stream(filteredIndexes).mapToObj(originalList::get).collect(Collectors.toList());
+ return Objects.nonNull(originalList)
+ ?
Arrays.stream(filteredIndexes).mapToObj(originalList::get).collect(Collectors.toList())
+ : null;
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/DecoratingLock.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/DecoratingLock.java
deleted file mode 100644
index cc5d0d93e5e..00000000000
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/DecoratingLock.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.pipe.task;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class DecoratingLock {
- private final AtomicBoolean isDecorating = new AtomicBoolean(false);
-
- public void waitForDecorated() {
- while (isDecorating.get()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void markAsDecorating() {
- isDecorating.set(true);
- }
-
- public void markAsDecorated() {
- isDecorating.set(false);
- }
-}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index f5bfe36c6c7..78e913d6575 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -23,7 +23,6 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalExcep
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
-import org.apache.iotdb.commons.pipe.task.DecoratingLock;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -43,7 +42,6 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
protected PipeConnector outputPipeConnector;
// For thread pool to execute callbacks
- protected final DecoratingLock callbackDecoratingLock = new DecoratingLock();
protected ExecutorService subtaskCallbackListeningExecutor;
// For controlling subtask submitting, making sure that
@@ -51,41 +49,30 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
protected volatile boolean isSubmitted = false;
protected PipeAbstractConnectorSubtask(
- String taskID, long creationTime, PipeConnector outputPipeConnector) {
+ final String taskID, final long creationTime, final PipeConnector
outputPipeConnector) {
super(taskID, creationTime);
this.outputPipeConnector = outputPipeConnector;
}
@Override
public void bindExecutors(
- ListeningExecutorService subtaskWorkerThreadPoolExecutor,
- ExecutorService subtaskCallbackListeningExecutor,
- PipeSubtaskScheduler subtaskScheduler) {
+ final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+ final ExecutorService subtaskCallbackListeningExecutor,
+ final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
this.subtaskScheduler = subtaskScheduler;
}
@Override
- public Boolean call() throws Exception {
- final boolean hasAtLeastOneEventProcessed = super.call();
-
- // Wait for the callable to be decorated by Futures.addCallback in the
executorService
- // to make sure that the callback can be submitted again on success or
failure.
- callbackDecoratingLock.waitForDecorated();
-
- return hasAtLeastOneEventProcessed;
- }
-
- @Override
- public synchronized void onSuccess(Boolean hasAtLeastOneEventProcessed) {
+ public synchronized void onSuccess(final Boolean
hasAtLeastOneEventProcessed) {
isSubmitted = false;
super.onSuccess(hasAtLeastOneEventProcessed);
}
@Override
- public synchronized void onFailure(Throwable throwable) {
+ public synchronized void onFailure(final Throwable throwable) {
isSubmitted = false;
if (isClosed.get()) {
@@ -116,7 +103,7 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
/**
* @return {@code true} if the {@link PipeSubtask} should be stopped, {@code
false} otherwise
*/
- private boolean onPipeConnectionException(Throwable throwable) {
+ private boolean onPipeConnectionException(final Throwable throwable) {
LOGGER.warn(
"PipeConnectionException occurred, {} retries to handshake with the
target system.",
outputPipeConnector.getClass().getName(),
@@ -130,7 +117,7 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
"{} handshakes with the target system successfully.",
outputPipeConnector.getClass().getName());
break;
- } catch (Exception e) {
+ } catch (final Exception e) {
retry++;
LOGGER.warn(
"{} failed to handshake with the target system for {} times, "
@@ -141,7 +128,7 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
e);
try {
Thread.sleep(retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
- } catch (InterruptedException interruptedException) {
+ } catch (final InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to handshake with the
target system.",
interruptedException);
@@ -195,13 +182,8 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
return;
}
- callbackDecoratingLock.markAsDecorating();
- try {
- final ListenableFuture<Boolean> nextFuture =
subtaskWorkerThreadPoolExecutor.submit(this);
- Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
- isSubmitted = true;
- } finally {
- callbackDecoratingLock.markAsDecorated();
- }
+ final ListenableFuture<Boolean> nextFuture =
subtaskWorkerThreadPoolExecutor.submit(this);
+ Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
+ isSubmitted = true;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index 327152dd0c3..462644db4df 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -31,12 +31,12 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeReportableSubtask.class);
- protected PipeReportableSubtask(String taskID, long creationTime) {
+ protected PipeReportableSubtask(final String taskID, final long
creationTime) {
super(taskID, creationTime);
}
@Override
- public synchronized void onFailure(Throwable throwable) {
+ public synchronized void onFailure(final Throwable throwable) {
if (isClosed.get()) {
LOGGER.info("onFailure in pipe subtask, ignored because pipe is
dropped.", throwable);
clearReferenceCountAndReleaseLastEvent();
@@ -55,7 +55,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
// is dropped or the process is running normally.
}
- private void onEnrichedEventFailure(Throwable throwable) {
+ private void onEnrichedEventFailure(final Throwable throwable) {
final int maxRetryTimes =
throwable instanceof
PipeRuntimeConnectorRetryTimesConfigurableException
? ((PipeRuntimeConnectorRetryTimesConfigurableException)
throwable).getRetryTimes()
@@ -75,15 +75,17 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
retryCount.incrementAndGet();
if (retryCount.get() <= MAX_RETRY_TIMES) {
LOGGER.warn(
- "Retry executing subtask {} (creation time: {}, simple class: {}),
retry count [{}/{}]",
+ "Retry executing subtask {} (creation time: {}, simple class: {}),
retry count [{}/{}], last exception: {}",
taskID,
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
- maxRetryTimes);
+ maxRetryTimes,
+ throwable.getMessage(),
+ throwable);
try {
Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time:
{}, simple class: {})",
taskID,
@@ -123,11 +125,11 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
}
}
- protected abstract String getRootCause(Throwable throwable);
+ protected abstract String getRootCause(final Throwable throwable);
- protected abstract void report(EnrichedEvent event, PipeRuntimeException
exception);
+ protected abstract void report(final EnrichedEvent event, final
PipeRuntimeException exception);
- private void onNonEnrichedEventFailure(Throwable throwable) {
+ private void onNonEnrichedEventFailure(final Throwable throwable) {
if (retryCount.get() == 0) {
LOGGER.warn(
"Failed to execute subtask {} (creation time: {}, simple class: {}),
"
@@ -141,14 +143,16 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
retryCount.incrementAndGet();
LOGGER.warn(
- "Retry executing subtask {} (creation time: {}, simple class: {}),
retry count {}",
+ "Retry executing subtask {} (creation time: {}, simple class: {}),
retry count {}, last exception: {}",
taskID,
creationTime,
this.getClass().getSimpleName(),
- retryCount.get());
+ retryCount.get(),
+ throwable.getMessage(),
+ throwable);
try {
Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time: {},
simple class: {})",
taskID,