SinghAsDev commented on code in PR #5036:
URL: https://github.com/apache/iceberg/pull/5036#discussion_r901148490


##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -631,4 +647,40 @@ private static boolean hiveEngineEnabled(TableMetadata 
metadata, Configuration c
 
     return conf.getBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, 
TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
   }
+
+  private static class HiveLockHeartbeat implements Runnable {
+    private final ClientPool<IMetaStoreClient, TException> hmsClients;
+    private final long lockId;
+    private final long intervalMs;
+    private ScheduledFuture<?> future;
+
+    HiveLockHeartbeat(ClientPool<IMetaStoreClient, TException> hmsClients, 
long lockId, long intervalMs) {
+      this.hmsClients = hmsClients;
+      this.lockId = lockId;
+      this.intervalMs = intervalMs;
+      this.future = null;
+    }
+
+    @Override
+    public void run() {
+      try {
+        hmsClients.run(client -> {
+          client.heartbeat(0, lockId);
+          return null;
+        });
+      } catch (TException | InterruptedException e) {
+        LOG.error("Fail to heartbeat for lock: {}", lockId, e);

Review Comment:
   Thanks, I also think we should fail the commit on heart beat failure. 
However, to do so we would have to add retry on heart beats as well, so that 
transient failures don't fail the entire job. I will add that.
   
   For the unlock failure handling, I think that needs a bit more discussion 
and if it is OK with you let's do that discussion in a follow up PR.



##########
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java:
##########
@@ -631,4 +647,40 @@ private static boolean hiveEngineEnabled(TableMetadata 
metadata, Configuration c
 
     return conf.getBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, 
TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
   }
+
+  private static class HiveLockHeartbeat implements Runnable {
+    private final ClientPool<IMetaStoreClient, TException> hmsClients;
+    private final long lockId;
+    private final long intervalMs;
+    private ScheduledFuture<?> future;
+
+    HiveLockHeartbeat(ClientPool<IMetaStoreClient, TException> hmsClients, 
long lockId, long intervalMs) {
+      this.hmsClients = hmsClients;
+      this.lockId = lockId;
+      this.intervalMs = intervalMs;
+      this.future = null;
+    }
+
+    @Override
+    public void run() {
+      try {
+        hmsClients.run(client -> {
+          client.heartbeat(0, lockId);
+          return null;
+        });
+      } catch (TException | InterruptedException e) {
+        LOG.error("Fail to heartbeat for lock: {}", lockId, e);
+      }
+    }
+
+    public void schedule(ScheduledExecutorService scheduler) {
+      future = scheduler.scheduleAtFixedRate(this, 0, intervalMs, 
TimeUnit.MILLISECONDS);

Review Comment:
   Thanks, makes sense. Made initial delay as `intervalMs / 2`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to