phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1399931215
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
- List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
- && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
- }});
- if (taskStateNames == null || taskStateNames.size() == 0) {
- LOGGER.debug("No output task state files found in " +
this.outputTaskStateDir);
+ final Queue<TaskState> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir,
this.stateSerDeRunnerThreads);
+ if (taskStateQueue == null) {
return;
}
-
- final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
- try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, null)) {
- for (final String taskStateName : taskStateNames) {
- LOGGER.debug("Found output task state file " + taskStateName);
- // Deserialize the TaskState and delete the file
- stateSerDeRunner.submitCallable(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- TaskState taskState =
taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
- taskStateQueue.add(taskState);
- taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);
Review Comment:
seems this `delete` is no longer happening in the reimpl, is that true? if
omitted, are we concerned?
method-level javadoc might describe whose responsibility to delete...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]