Chu Xue created FLINK-38294:
-------------------------------
Summary: Hbase connector misclassifies failed task as successful
Key: FLINK-38294
URL: https://issues.apache.org/jira/browse/FLINK-38294
Project: Flink
Issue Type: Bug
Components: Connectors / HBase
Affects Versions: 1.20.2, 1.19.3, 1.18.1
Reporter: Chu Xue
Attachments: jobmanager.log, taskmanager.log
The Hbase connector will not flush when write less than
1000(sink.buffer-flush.max-rows) records or execute in less than
1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator
excute flush when close and flush maybe failed.
{code:java}
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
this.executor =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture =
this.executor.scheduleWithFixedDelay(
() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an
exception
failureThrowable.compareAndSet(null, e);
}
},
bufferFlushIntervalMillis,
bufferFlushIntervalMillis,
TimeUnit.MILLISECONDS);
} {code}
{code:java}
@SuppressWarnings("rawtypes")@Overridepublic void invoke(T value, Context
context) throws Exception { checkErrorAndRethrow();
mutator.mutate(mutationConverter.convertToMutation(value));
// flush when the buffer number of mutations greater than the
configured max size. if (bufferFlushMaxMutations > 0 &&
numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
flush(); } else if (bufferFlushMaxMutations == 0 &&
bufferFlushMaxSizeInBytes == 0) { flush(); }}
{code}
{code:java}
//代码占位符
@Overridepublic void close() throws Exception { closed = true;
if (mutator != null) { try {
mutator.close(); } catch (IOException e) {
LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
} this.mutator = null; }
if (connection != null) { try {
connection.close(); } catch (IOException e) {
LOG.warn("Exception occurs while closing HBase Connection.", e);
} this.connection = null; }
if (scheduledFuture != null) { scheduledFuture.cancel(false);
if (executor != null) { executor.shutdownNow();
} }} {code}
For example, creating a permission denial case where the user does not have
permission(ranger) for the hbase namespace.The task will failed ,but return
success.
[^jobmanager.log]
[^taskmanager.log]
Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this,
throw the error.
{code:java}
@Overridepublic void close() throws Exception { closed = true;
if (mutator != null) { try {
mutator.close(); } catch (IOException e) {
LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
} this.mutator = null; }
if (connection != null) { try {
connection.close(); } catch (IOException e) {
LOG.warn("Exception occurs while closing HBase Connection.", e);
} this.connection = null; }
if (scheduledFuture != null) { scheduledFuture.cancel(false);
if (executor != null) { executor.shutdownNow();
} } checkErrorAndRethrow();} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)