dannycranmer commented on code in PR #20542:
URL: https://github.com/apache/flink/pull/20542#discussion_r971658905


##########
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java:
##########
@@ -76,6 +79,7 @@
     private transient ScheduledExecutorService executor;
     private transient ScheduledFuture scheduledFuture;
     private transient AtomicLong numPendingRequests;
+    private static Map<byte[], Mutation> mutationMap = new HashMap<>();

Review Comment:
   Why is this static? This means subtasks on the same JVM would all have 
access to, and try to flush the same data.



##########
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java:
##########
@@ -213,6 +225,7 @@ public void close() throws Exception {
 
         if (mutator != null) {
             try {
+                flush();

Review Comment:
   While your thread safety looks ok (besides the `static` `Map`), generally 
speaking `flush()` on `close()` can cause issues. If the destination is down, 
the job might fail to stop. A better solution is to checkpoint the internal 
buffer. Longer term we can consider migrating to the Async Sink base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to