[ https://issues.apache.org/jira/browse/FLINK-38294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chu Xue updated FLINK-38294: ---------------------------- Description: The Hbase connector will not flush when write less than 1000(sink.buffer-flush.max-rows) records or execute in less than 1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator excute flush() when close() and flush() maybe failed. {code:java} if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) { this.executor = Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher")); this.scheduledFuture = this.executor.scheduleWithFixedDelay( () -> { if (closed) { return; } try { flush(); } catch (Exception e) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, e); } }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS); } {code} {code:java} @SuppressWarnings("rawtypes") @Overridepublic void invoke(T value, Context context) throws Exception { checkErrorAndRethrow(); mutator.mutate(mutationConverter.convertToMutation(value)); //flush when the buffer number of mutations greater than the configured max size. if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { flush(); } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) { flush(); }} {code} {code:java} @Override public void close() throws Exception { closed = true; if (mutator != null) { try { mutator.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase BufferedMutator.", e); } this.mutator = null; } if (connection != null) { try { connection.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase Connection.", e); } this.connection = null; } if (scheduledFuture != null) { scheduledFuture.cancel(false); if (executor != null) { executor.shutdownNow(); } }} {code} For example, creating a permission denial case where the user does not have permission(ranger) for the hbase namespace.The task will failed ,but return success. [^jobmanager.log] [^taskmanager.log] Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this, throw the error. {code:java} @Override public void close() throws Exception { closed = true; if (mutator != null) { try { mutator.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase BufferedMutator.", e); } this.mutator = null; } if (connection != null) { try { connection.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase Connection.", e); } this.connection = null; } if (scheduledFuture != null) { scheduledFuture.cancel(false); if (executor != null) { executor.shutdownNow(); } } //add check checkErrorAndRethrow(); } {code} was: The Hbase connector will not flush when write less than 1000(sink.buffer-flush.max-rows) records or execute in less than 1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator excute flush() when close() and flush() maybe failed. {code:java} if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) { this.executor = Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher")); this.scheduledFuture = this.executor.scheduleWithFixedDelay( () -> { if (closed) { return; } try { flush(); } catch (Exception e) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, e); } }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS); } {code} {code:java} @SuppressWarnings("rawtypes") @Overridepublic void invoke(T value, Context context) throws Exception { checkErrorAndRethrow(); mutator.mutate(mutationConverter.convertToMutation(value)); //flush when the buffer number of mutations greater than the configured max size. if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { flush(); } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) { flush(); }} {code} {code:java} @Override public void close() throws Exception { closed = true; if (mutator != null) { try { mutator.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase BufferedMutator.", e); } this.mutator = null; } if (connection != null) { try { connection.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase Connection.", e); } this.connection = null; } if (scheduledFuture != null) { scheduledFuture.cancel(false); if (executor != null) { executor.shutdownNow(); } }} {code} For example, creating a permission denial case where the user does not have permission(ranger) for the hbase namespace.The task will failed ,but return success. [^jobmanager.log] [^taskmanager.log] Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this, throw the error. {code:java} @Override public void close() throws Exception { closed = true; if (mutator != null) { try { mutator.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase BufferedMutator.", e); } this.mutator = null; } if (connection != null) { try { connection.close(); } catch (IOException e) { LOG.warn("Exception occurs while closing HBase Connection.", e); } this.connection = null; } if (scheduledFuture != null) { scheduledFuture.cancel(false); if (executor != null) { executor.shutdownNow(); } } //add check checkErrorAndRethrow(); } {code} > Hbase connector misclassifies failed task as successful > ------------------------------------------------------- > > Key: FLINK-38294 > URL: https://issues.apache.org/jira/browse/FLINK-38294 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase > Affects Versions: 1.18.1, 1.19.3, 1.20.2 > Reporter: Chu Xue > Priority: Major > Attachments: jobmanager.log, taskmanager.log > > > The Hbase connector will not flush when write less than > 1000(sink.buffer-flush.max-rows) records or execute in less than > 1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator > excute flush() when close() and flush() maybe failed. > {code:java} > if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) { > this.executor = > Executors.newScheduledThreadPool( > 1, new > ExecutorThreadFactory("hbase-upsert-sink-flusher")); > this.scheduledFuture = > this.executor.scheduleWithFixedDelay( > () -> { > if (closed) { > return; > } > try { > flush(); > } catch (Exception e) { > // fail the sink and skip the rest of the items > // if the failure handler decides to throw an > exception > failureThrowable.compareAndSet(null, e); > } > }, > bufferFlushIntervalMillis, > bufferFlushIntervalMillis, > TimeUnit.MILLISECONDS); > } {code} > {code:java} > @SuppressWarnings("rawtypes") > @Overridepublic > void invoke(T value, Context context) throws Exception { > checkErrorAndRethrow(); > mutator.mutate(mutationConverter.convertToMutation(value)); > //flush when the buffer number of mutations greater than the configured > max size. > if (bufferFlushMaxMutations > 0 > && numPendingRequests.incrementAndGet() >= > bufferFlushMaxMutations) { > flush(); > } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == > 0) { > flush(); > }} > {code} > {code:java} > @Override > public void close() throws Exception { > closed = true; > if (mutator != null) { > try { > mutator.close(); > } catch (IOException e) { > LOG.warn("Exception occurs while closing HBase > BufferedMutator.", e); > } > this.mutator = null; > } > if (connection != null) { > try { > connection.close(); > } catch (IOException e) { > LOG.warn("Exception occurs while closing HBase > Connection.", e); > } > this.connection = null; > } > if (scheduledFuture != null) { > scheduledFuture.cancel(false); > if (executor != null) { > executor.shutdownNow(); > } > }} {code} > For example, creating a permission denial case where the user does not have > permission(ranger) for the hbase namespace.The task will failed ,but return > success. > [^jobmanager.log] > [^taskmanager.log] > > Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like > this, throw the error. > {code:java} > @Override > public void close() throws Exception { > closed = true; > if (mutator != null) { > try { > mutator.close(); > } catch (IOException e) { > LOG.warn("Exception occurs while closing HBase > BufferedMutator.", e); > } > this.mutator = null; > } > if (connection != null) { > try { > connection.close(); > } catch (IOException e) { > LOG.warn("Exception occurs while closing HBase > Connection.", e); > } > this.connection = null; > } > if (scheduledFuture != null) { > scheduledFuture.cancel(false); > if (executor != null) { > executor.shutdownNow(); > } > } > //add check > checkErrorAndRethrow(); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)