vinothchandar commented on a change in pull request #2359:
URL: https://github.com/apache/hudi/pull/2359#discussion_r568815079
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -160,7 +180,8 @@ public boolean commit(String instantTime, O writeStatuses) {
public boolean commit(String instantTime, O writeStatuses,
Option<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = createMetaClient(false);
String actionType = metaClient.getCommitActionType();
- return commit(instantTime, writeStatuses, extraMetadata, actionType,
Collections.emptyMap());
+ boolean isCommitted = commit(instantTime, writeStatuses, extraMetadata,
actionType, Collections.emptyMap());
Review comment:
remote `isCommitted` and return directly?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -750,24 +767,49 @@ private HoodieTimeline
getInflightTimelineExcludeCompactionAndClustering(HoodieT
}
/**
- * Cleanup all pending commits.
+ * Clean up all failed commits.
+=======
Review comment:
same here
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -732,6 +746,9 @@ public void rollbackInflightCompaction(HoodieInstant
inflightInstant, HoodieTabl
}
/**
+<<<<<<< 1e1a9a5ba5907efb5dd8cb1e0ae830aa78725dd1
Review comment:
this seems like remnant of merge conficts? can you please make a full
pass and ensure no temp debugging changes/ or remnants like this are removed?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -105,7 +112,8 @@ public static SparkConf registerClasses(SparkConf conf) {
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds) {
List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds);
+ boolean isCommitted = commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds);
Review comment:
same. lets remove the temp variable
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -133,6 +134,12 @@
private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED =
"hoodie.merge.data.validation.enabled";
private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED =
"false";
+ public static final String CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP =
"hoodie.client.heartbeat.interval_in_ms";
+ public static final Integer DEFAULT_CLIENT_HEARTBEAT_INTERVAL_IN_MS = 60 *
1000;
+
+ public static final String CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES_PROP =
"hoodie.client.heartbeat.tolerable_misses";
Review comment:
nit: tolerable.misses
##########
File path:
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.HeartbeatUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieHeartbeatClient extends HoodieCommonTestHarness {
+
+ private static final Logger LOG =
LogManager.getLogger(TestHoodieHeartbeatClient.class);
+
+ private static String instantTime1 = "100";
+ private static String instantTime2 = "101";
+ private static Long heartBeatInterval = 1000L;
+ private static int numTolerableMisses = 1;
+
+ @BeforeEach
+ public void init() throws IOException {
+ initMetaClient();
+ }
+
+ @Test
+ public void testStartHeartbeat() throws IOException {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ FileStatus [] fs = metaClient.getFs().listStatus(new
Path(hoodieHeartbeatClient.getHeartbeatFolderPath()));
+ assertTrue(fs.length == 1);
+ assertTrue(fs[0].getPath().toString().contains(instantTime1));
+ }
+
+ @Test
+ public void testStopHeartbeat() {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ hoodieHeartbeatClient.stop(instantTime1);
+ await().atMost(5, SECONDS).until(() ->
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats() > 0);
+ Integer numHeartBeats =
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats();
+ assertTrue(numHeartBeats == 1);
+ }
+
+ @Test
+ public void testIsHeartbeatExpired() throws IOException {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ hoodieHeartbeatClient.stop(instantTime1);
+ assertFalse(hoodieHeartbeatClient.isHeartbeatExpired(instantTime1));
+ }
+
+ @Test
+ public void testNumHeartbeatsGenerated() {
+ Long heartBeatInterval = 5000L;
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start("100");
+ await().atMost(5, SECONDS).until(() ->
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats() >= 1);
+ }
+
+ @Test
+ public void testDeleteWrongHeartbeat() throws IOException {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ hoodieHeartbeatClient.stop(instantTime1);
+ assertFalse(HeartbeatUtils.deleteHeartbeatFile(metaClient.getFs(),
basePath, instantTime2));
+ }
+
+ @Disabled
+ public void testConcurrentReadsAndWritesToSameFile() throws
InterruptedException {
+ Thread th1 = new Thread(() -> {
+ try {
+ while (true) {
+ OutputStream outputStream = metaClient.getFs().create(new
Path(metaClient.getBasePath() + "/test"), true);
+ outputStream.write("testing".getBytes());
+ outputStream.close();
+ }
+ } catch (Exception e) {
+ LOG.info("Caught Exception th1");
+ e.printStackTrace();
+ }
+ });
+
+ Thread th2 = new Thread(() -> {
+ try {
+ while (true) {
+ FileStatus fs = metaClient.getFs().getFileStatus(new
Path(metaClient.getBasePath() + "/test"));
+ }
+ } catch (Exception e) {
+ LOG.info("Caught Exception th2");
+ e.printStackTrace();
+ }
+ });
+
+ Thread th3 = new Thread(() -> {
+ try {
+ while (true) {
+ String heartbeat =
org.apache.commons.io.IOUtils.toString(metaClient.getFs().open(new
Path(metaClient.getBasePath() + "/test")));
+ }
+ } catch (Exception e) {
+ LOG.info("Caught Exception th3");
+ e.printStackTrace();
Review comment:
`LOG.error()` and get rid of printing stack trace?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -859,6 +901,19 @@ protected void finalizeWrite(HoodieTable<T, I, K, O>
table, String instantTime,
}
}
+ protected void checkHeartbeatExpired(String instantTime, HoodieTable table) {
Review comment:
lets move this to `HeartbeatUtils`
##########
File path:
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.HeartbeatUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieHeartbeatClient extends HoodieCommonTestHarness {
+
+ private static final Logger LOG =
LogManager.getLogger(TestHoodieHeartbeatClient.class);
+
+ private static String instantTime1 = "100";
+ private static String instantTime2 = "101";
+ private static Long heartBeatInterval = 1000L;
+ private static int numTolerableMisses = 1;
+
+ @BeforeEach
+ public void init() throws IOException {
+ initMetaClient();
+ }
+
+ @Test
+ public void testStartHeartbeat() throws IOException {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ FileStatus [] fs = metaClient.getFs().listStatus(new
Path(hoodieHeartbeatClient.getHeartbeatFolderPath()));
+ assertTrue(fs.length == 1);
+ assertTrue(fs[0].getPath().toString().contains(instantTime1));
+ }
+
+ @Test
+ public void testStopHeartbeat() {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ hoodieHeartbeatClient.stop(instantTime1);
+ await().atMost(5, SECONDS).until(() ->
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats() > 0);
+ Integer numHeartBeats =
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats();
+ assertTrue(numHeartBeats == 1);
+ }
+
+ @Test
+ public void testIsHeartbeatExpired() throws IOException {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ hoodieHeartbeatClient.stop(instantTime1);
+ assertFalse(hoodieHeartbeatClient.isHeartbeatExpired(instantTime1));
+ }
+
+ @Test
+ public void testNumHeartbeatsGenerated() {
+ Long heartBeatInterval = 5000L;
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start("100");
+ await().atMost(5, SECONDS).until(() ->
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats() >= 1);
+ }
+
+ @Test
+ public void testDeleteWrongHeartbeat() throws IOException {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ hoodieHeartbeatClient.stop(instantTime1);
+ assertFalse(HeartbeatUtils.deleteHeartbeatFile(metaClient.getFs(),
basePath, instantTime2));
+ }
+
+ @Disabled
+ public void testConcurrentReadsAndWritesToSameFile() throws
InterruptedException {
+ Thread th1 = new Thread(() -> {
+ try {
+ while (true) {
+ OutputStream outputStream = metaClient.getFs().create(new
Path(metaClient.getBasePath() + "/test"), true);
+ outputStream.write("testing".getBytes());
+ outputStream.close();
+ }
+ } catch (Exception e) {
+ LOG.info("Caught Exception th1");
+ e.printStackTrace();
+ }
+ });
+
+ Thread th2 = new Thread(() -> {
+ try {
+ while (true) {
+ FileStatus fs = metaClient.getFs().getFileStatus(new
Path(metaClient.getBasePath() + "/test"));
+ }
+ } catch (Exception e) {
+ LOG.info("Caught Exception th2");
+ e.printStackTrace();
+ }
+ });
+
+ Thread th3 = new Thread(() -> {
+ try {
+ while (true) {
+ String heartbeat =
org.apache.commons.io.IOUtils.toString(metaClient.getFs().open(new
Path(metaClient.getBasePath() + "/test")));
+ }
+ } catch (Exception e) {
+ LOG.info("Caught Exception th3");
+ e.printStackTrace();
+ }
+ });
+
+ th1.start();
+ Thread.sleep(100);
+ th2.start();
+ th3.start();
+
+ // To simulate the issues of race conditions when creating & reading file,
simply do th1.join; th2.join; th3.join;
Review comment:
was wondering how these threads actually stop?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -750,24 +767,49 @@ private HoodieTimeline
getInflightTimelineExcludeCompactionAndClustering(HoodieT
}
/**
- * Cleanup all pending commits.
+ * Clean up all failed commits.
+=======
+ * Rollback all failed commits.
+>>>>>>> Adding configs to make failed writes eager/lazy
+=======
+ * Rollback all failed writes.
+>>>>>>> Refactored code, removed parallel writing capability
*/
- private void rollbackPendingCommits() {
+ public void rollbackFailedWrites() {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
- HoodieTimeline inflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(table);
- List<String> commits =
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
- .collect(Collectors.toList());
- for (String commit : commits) {
- if (HoodieTimeline.compareTimestamps(commit,
HoodieTimeline.LESSER_THAN_OR_EQUALS,
+ List<String> instantsToRollback = getInstantsToRollback(table);
+ for (String instant : instantsToRollback) {
+ if (HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
- rollBackInflightBootstrap();
+ rollbackFailedBootstrap();
break;
} else {
- rollback(commit);
+ rollback(instant);
}
}
}
+ private List<String> getInstantsToRollback(HoodieTable<T, I, K, O> table) {
+ if (config.getFailedWritesCleanPolicy().isEager()) {
+ HoodieTimeline inflightTimeline =
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+ return
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toList());
+ } else if (!config.getFailedWritesCleanPolicy().isEager()) {
Review comment:
would nt `HoodieFailedWritesCleaningPolicy.NEVER` also pass the check
for `!eager` ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -128,11 +133,26 @@ public AbstractHoodieWriteClient(HoodieEngineContext
context, HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
* @param timelineService Timeline Service that runs as part of write client.
*/
+ @Deprecated
public AbstractHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig, boolean rollbackPending,
Review comment:
is `rollbackPending` being used anymore? remove?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -707,24 +739,51 @@ public void rollbackInflightCompaction(HoodieInstant
inflightInstant, HoodieTabl
}
/**
- * Cleanup all pending commits.
+ * Rollback all failed commits.
*/
- private void rollbackPendingCommits() {
+ public void rollbackFailedCommits() {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
- HoodieTimeline inflightTimeline =
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
- List<String> commits =
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
- .collect(Collectors.toList());
- for (String commit : commits) {
- if (HoodieTimeline.compareTimestamps(commit,
HoodieTimeline.LESSER_THAN_OR_EQUALS,
+ List<String> instantsToRollback = getInstantsToRollback(table);
Review comment:
can you comment again on whether this changes the answer to my question
above?
> I have also added a check in the merging of log blocks where if the
instant is not on the active timeline, that block will be ignored
The concern I had was the part 2 where, a committed write could have been
archived and we may end up skipping it. Can you please clarify again how we
guard that? By ensuring the archival will wait for the cleaner to log this
block?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/HeartbeatUtils.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to delete heartbeat for completed or failed instants with
expired heartbeats.
+ */
+public class HeartbeatUtils {
+
+ private static final Logger LOG = LogManager.getLogger(HeartbeatUtils.class);
+
+ /**
+ * Deletes the heartbeat file for the specified instant.
+ * @param fs
+ * @param basePath
+ * @param instantTime
+ * @return
+ */
+ public static boolean deleteHeartbeatFile(FileSystem fs, String basePath,
String instantTime) {
+ boolean deleted = false;
+ try {
+ String heartbeatFolderPath =
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+ deleted = fs.delete(new Path(heartbeatFolderPath + File.separator +
instantTime), false);
+ if (!deleted) {
+ LOG.error("Failed to delete heartbeat for instant " + instantTime);
+ }
+ } catch (IOException io) {
+ LOG.error("Unable to delete heartbeat for instant " + instantTime, io);
+ }
+ return deleted;
+ }
+
+ /**
+ * Deletes the heartbeat files for instants with expired heartbeats or
orphaned heartbeats without any active instant.
+ * @param existingHeartbeatInstants
+ * @param metaClient
+ * @param basePath
+ */
+ public static void cleanUpExpiredOrOrphanHeartbeatFiles(List<String>
existingHeartbeatInstants,
Review comment:
rename by dropping the `Files` suffix? is there any special handling
for expired vs orphaned? why not just call it `cleanExpiredHearbeats`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HeartbeatUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieHeartbeatException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to
ascertain whether the running job is or not.
Review comment:
will review this class again, pending this discussion here.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/HeartbeatUtils.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to delete heartbeat for completed or failed instants with
expired heartbeats.
+ */
+public class HeartbeatUtils {
Review comment:
is there a reason why this should be in `hudi-common`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]