dannycranmer commented on code in PR #20542:
URL: https://github.com/apache/flink/pull/20542#discussion_r971658905
##########
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java:
##########
@@ -76,6 +79,7 @@
private transient ScheduledExecutorService executor;
private transient ScheduledFuture scheduledFuture;
private transient AtomicLong numPendingRequests;
+ private static Map<byte[], Mutation> mutationMap = new HashMap<>();
Review Comment:
Why is this static? This means subtasks on the same JVM would all have
access to, and try to flush the same data.
##########
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java:
##########
@@ -213,6 +225,7 @@ public void close() throws Exception {
if (mutator != null) {
try {
+ flush();
Review Comment:
While your thread safety looks ok (besides the `static` `Map`), generally
speaking `flush()` on `close()` can cause issues. If the destination is down,
the job might fail to stop. A better solution is to checkpoint the internal
buffer. Longer term we can consider migrating to the Async Sink base.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]