[ 
https://issues.apache.org/jira/browse/FLINK-38294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chu Xue updated FLINK-38294:
----------------------------
    Description: 
The Hbase connector will not flush when write less than 
1000(sink.buffer-flush.max-rows) records or execute in less than 
1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator 
excute flush() when close() and flush() maybe failed.
{code:java}
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
    this.executor =
            Executors.newScheduledThreadPool(
                    1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
    this.scheduledFuture =
            this.executor.scheduleWithFixedDelay(
                    () -> {
                        if (closed) {
                            return;
                        }
                        try {
                            flush();
                        } catch (Exception e) {
                            // fail the sink and skip the rest of the items
                            // if the failure handler decides to throw an 
exception
                            failureThrowable.compareAndSet(null, e);
                        }
                    },
                    bufferFlushIntervalMillis,
                    bufferFlushIntervalMillis,
                    TimeUnit.MILLISECONDS);
} {code}
{code:java}
@SuppressWarnings("rawtypes")
@Overridepublic
void invoke(T value, Context context) throws Exception {
        checkErrorAndRethrow();
        mutator.mutate(mutationConverter.convertToMutation(value));
        //flush when the buffer number of mutations greater than the configured 
max size.
        if (bufferFlushMaxMutations > 0
                        && numPendingRequests.incrementAndGet() >= 
bufferFlushMaxMutations) {
                flush();
        } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 
0) {
                flush();
        }}

{code}
{code:java}
@Override
public void close() throws Exception {
        closed = true;
        if (mutator != null) {
                try {
                        mutator.close();
                } catch (IOException e) {
                        LOG.warn("Exception occurs while closing HBase 
BufferedMutator.", e);
                }
                this.mutator = null;
        }
        if (connection != null) {
                try {
                        connection.close();
                } catch (IOException e) {
                        LOG.warn("Exception occurs while closing HBase 
Connection.", e);
                }
                this.connection = null;
        }
        if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
                if (executor != null) {
                        executor.shutdownNow();
                }
        }} {code}
For example, creating a permission denial case where the user does not have 
permission(ranger) for the hbase namespace.The task will failed ,but return 
success.
[^jobmanager.log]
[^taskmanager.log]
 
Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this, 
throw the error.
{code:java}
@Override
public void close() throws Exception {
        closed = true;
        if (mutator != null) {
                try {
                        mutator.close();
                } catch (IOException e) {
                        LOG.warn("Exception occurs while closing HBase 
BufferedMutator.", e);
                }
                this.mutator = null;
        }
        if (connection != null) {
                try {
                        connection.close();
                } catch (IOException e) {
                        LOG.warn("Exception occurs while closing HBase 
Connection.", e);
                }
                this.connection = null;
        }
        if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
                if (executor != null) {
                        executor.shutdownNow();
                }
        }
        //add check
        checkErrorAndRethrow();
} {code}

  was:
The Hbase connector will not flush when write less than 
1000(sink.buffer-flush.max-rows) records or execute in less than 
1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator 
excute flush() when close() and flush() maybe failed.
{code:java}
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
    this.executor =
            Executors.newScheduledThreadPool(
                    1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
    this.scheduledFuture =
            this.executor.scheduleWithFixedDelay(
                    () -> {
                        if (closed) {
                            return;
                        }
                        try {
                            flush();
                        } catch (Exception e) {
                            // fail the sink and skip the rest of the items
                            // if the failure handler decides to throw an 
exception
                            failureThrowable.compareAndSet(null, e);
                        }
                    },
                    bufferFlushIntervalMillis,
                    bufferFlushIntervalMillis,
                    TimeUnit.MILLISECONDS);
} {code}
 
{code:java}
@SuppressWarnings("rawtypes")
@Overridepublic
void invoke(T value, Context context) throws Exception {
        checkErrorAndRethrow();
        mutator.mutate(mutationConverter.convertToMutation(value));
        //flush when the buffer number of mutations greater than the configured 
max size.
        if (bufferFlushMaxMutations > 0
                        && numPendingRequests.incrementAndGet() >= 
bufferFlushMaxMutations) {
                flush();
        } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 
0) {            flush();
        }}

{code}
{code:java}
@Override
public void close() throws Exception {
        closed = true;
        if (mutator != null) {
                try {
                        mutator.close();
                } catch (IOException e) {
                        LOG.warn("Exception occurs while closing HBase 
BufferedMutator.", e);
                }
                this.mutator = null;
        }
        if (connection != null) {
                try {
                        connection.close();
                } catch (IOException e) {
                        LOG.warn("Exception occurs while closing HBase 
Connection.", e);
                }
                this.connection = null;
        }
        if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
                if (executor != null) {
                        executor.shutdownNow();
                }
        }} {code}
For example, creating a permission denial case where the user does not have 
permission(ranger) for the hbase namespace.The task will failed ,but return 
success.
[^jobmanager.log]
[^taskmanager.log]
 
Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this, 
throw the error.
{code:java}
@Override
public void close() throws Exception {
        closed = true;
        if (mutator != null) {
                try {
                        mutator.close();
                } catch (IOException e) {
                        LOG.warn("Exception occurs while closing HBase 
BufferedMutator.", e);
                }
                this.mutator = null;
        }
        if (connection != null) {
                try {
                        connection.close();
                } catch (IOException e) {
                        LOG.warn("Exception occurs while closing HBase 
Connection.", e);
                }
                this.connection = null;
        }
        if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
                if (executor != null) {
                        executor.shutdownNow();
                }
        }
        //add check
        checkErrorAndRethrow();
} {code}


> Hbase connector misclassifies failed task as successful
> -------------------------------------------------------
>
>                 Key: FLINK-38294
>                 URL: https://issues.apache.org/jira/browse/FLINK-38294
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>    Affects Versions: 1.18.1, 1.19.3, 1.20.2
>            Reporter: Chu Xue
>            Priority: Major
>         Attachments: jobmanager.log, taskmanager.log
>
>
> The Hbase connector will not flush when write less than 
> 1000(sink.buffer-flush.max-rows) records or execute in less than 
> 1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator 
> excute flush() when close() and flush() maybe failed.
> {code:java}
> if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
>     this.executor =
>             Executors.newScheduledThreadPool(
>                     1, new 
> ExecutorThreadFactory("hbase-upsert-sink-flusher"));
>     this.scheduledFuture =
>             this.executor.scheduleWithFixedDelay(
>                     () -> {
>                         if (closed) {
>                             return;
>                         }
>                         try {
>                             flush();
>                         } catch (Exception e) {
>                             // fail the sink and skip the rest of the items
>                             // if the failure handler decides to throw an 
> exception
>                             failureThrowable.compareAndSet(null, e);
>                         }
>                     },
>                     bufferFlushIntervalMillis,
>                     bufferFlushIntervalMillis,
>                     TimeUnit.MILLISECONDS);
> } {code}
> {code:java}
> @SuppressWarnings("rawtypes")
> @Overridepublic
> void invoke(T value, Context context) throws Exception {
>       checkErrorAndRethrow();
>       mutator.mutate(mutationConverter.convertToMutation(value));
>       //flush when the buffer number of mutations greater than the configured 
> max size.
>       if (bufferFlushMaxMutations > 0
>                       && numPendingRequests.incrementAndGet() >= 
> bufferFlushMaxMutations) {
>               flush();
>       } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 
> 0) {
>               flush();
>       }}
> {code}
> {code:java}
> @Override
> public void close() throws Exception {
>       closed = true;
>       if (mutator != null) {
>               try {
>                       mutator.close();
>               } catch (IOException e) {
>                       LOG.warn("Exception occurs while closing HBase 
> BufferedMutator.", e);
>               }
>               this.mutator = null;
>       }
>       if (connection != null) {
>               try {
>                       connection.close();
>               } catch (IOException e) {
>                       LOG.warn("Exception occurs while closing HBase 
> Connection.", e);
>               }
>               this.connection = null;
>       }
>       if (scheduledFuture != null) {
>               scheduledFuture.cancel(false);
>               if (executor != null) {
>                       executor.shutdownNow();
>               }
>       }} {code}
> For example, creating a permission denial case where the user does not have 
> permission(ranger) for the hbase namespace.The task will failed ,but return 
> success.
> [^jobmanager.log]
> [^taskmanager.log]
>  
> Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like 
> this, throw the error.
> {code:java}
> @Override
> public void close() throws Exception {
>       closed = true;
>       if (mutator != null) {
>               try {
>                       mutator.close();
>               } catch (IOException e) {
>                       LOG.warn("Exception occurs while closing HBase 
> BufferedMutator.", e);
>               }
>               this.mutator = null;
>       }
>       if (connection != null) {
>               try {
>                       connection.close();
>               } catch (IOException e) {
>                       LOG.warn("Exception occurs while closing HBase 
> Connection.", e);
>               }
>               this.connection = null;
>       }
>       if (scheduledFuture != null) {
>               scheduledFuture.cancel(false);
>               if (executor != null) {
>                       executor.shutdownNow();
>               }
>       }
>         //add check
>       checkErrorAndRethrow();
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to