[
https://issues.apache.org/jira/browse/GOBBLIN-1968?focusedWorklogId=893101&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893101
]
ASF GitHub Bot logged work on GOBBLIN-1968:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Nov/23 09:03
Start Date: 30/Nov/23 09:03
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1410345417
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
- List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
- && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
- }});
- if (taskStateNames == null || taskStateNames.size() == 0) {
- LOGGER.debug("No output task state files found in " +
this.outputTaskStateDir);
+ final Queue<TaskState> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir,
this.stateSerDeRunnerThreads);
+ if (taskStateQueue == null) {
Review Comment:
I'd lean heavily toward returning an empty queue rather than `null`
if the issue is that the queue may be built asynchronously, such that
".isEmpty()" at the start might not always be the case... if so, use `Optional`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
- List<String> taskStateNames =
taskStateStore.getTableNames(outputTaskStateDir.getName(), new
Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
- && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
- }});
- if (taskStateNames == null || taskStateNames.size() == 0) {
- LOGGER.debug("No output task state files found in " +
this.outputTaskStateDir);
+ final Queue<TaskState> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir,
this.stateSerDeRunnerThreads);
+ if (taskStateQueue == null) {
Review Comment:
I'd lean heavily toward returning an empty queue rather than `null`
if the issue is that the queue may be built asynchronously, such that
".isEmpty()" at the start might not always be the case... for that, use
`Optional`
Issue Time Tracking
-------------------
Worklog Id: (was: 893101)
Time Spent: 50m (was: 40m)
> Gobblin Commit Step Runs on Temporal
> ------------------------------------
>
> Key: GOBBLIN-1968
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1968
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Temporal is the next-gen distributed computing platform Gobblin is using to
> replace MapReduce for many of its data movement workflows. We want to
> integrate Gobblin's commit step that tracks task states and reports their
> status with Temporal
--
This message was sent by Atlassian Jira
(v8.20.10#820010)