This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 20731d542a [GOBBLIN-2229] Enable access to Dataset state store during 
work discovery (#4144)
20731d542a is described below

commit 20731d542a467750dcbb5b1e1c45d63ff410c6b5
Author: pratapaditya04 <[email protected]>
AuthorDate: Fri Sep 26 12:45:43 2025 +0530

    [GOBBLIN-2229] Enable access to Dataset state store during work discovery 
(#4144)
    
    Enable access to dataset state store during work discovery
---
 .../org/apache/gobblin/runtime/JobContext.java     |  25 +---
 .../runtime/util/DatasetStateStoreUtils.java       | 101 ++++++++++++++++
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |  27 ++++-
 .../activity/impl/GenerateWorkUnitsImplTest.java   | 130 +++++++++++++++++++++
 4 files changed, 256 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index e591ad6859..9f7c918fbe 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -65,10 +65,10 @@ import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.runtime.JobState.DatasetState;
 import org.apache.gobblin.runtime.commit.FsCommitSequenceStore;
 import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
+import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
 import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
@@ -192,28 +192,7 @@ public class JobContext implements Closeable {
 
   protected DatasetStateStore createStateStore(Config jobConfig)
       throws IOException {
-    boolean stateStoreEnabled = 
!jobConfig.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) || jobConfig
-        .getBoolean(ConfigurationKeys.STATE_STORE_ENABLED);
-
-    String stateStoreType;
-
-    if (!stateStoreEnabled) {
-      stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP;
-    } else {
-      stateStoreType = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, ConfigUtils
-          .getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, 
ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
-    }
-
-    ClassAliasResolver<DatasetStateStore.Factory> resolver = new 
ClassAliasResolver<>(DatasetStateStore.Factory.class);
-
-    try {
-      DatasetStateStore.Factory stateStoreFactory = 
resolver.resolveClass(stateStoreType).newInstance();
-      return stateStoreFactory.createStateStore(jobConfig);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+   return DatasetStateStoreUtils.createStateStore(jobConfig);
   }
 
   protected Optional<JobHistoryStore> createJobHistoryStore(Properties 
jobProps) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/DatasetStateStoreUtils.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/DatasetStateStoreUtils.java
new file mode 100644
index 0000000000..d6b71f3e94
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/DatasetStateStoreUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.util;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Utilities for creating and managing {@link DatasetStateStore} instances 
based on job configuration.
+ */
+@Slf4j
+public class DatasetStateStoreUtils {
+
+  /**
+   * Private constructor to prevent instantiation of this utility class.
+   */
+  private DatasetStateStoreUtils() {
+    // Utility class should not be instantiated
+  }
+
+  /**
+   * Creates a {@link DatasetStateStore} instance based on the provided job 
configuration.
+   *
+   * <p>This method performs the following operations:</p>
+   * <ol>
+   *   <li>Checks if state store is enabled via {@link 
ConfigurationKeys#STATE_STORE_ENABLED}</li>
+   *   <li>Determines the appropriate state store type from configuration 
hierarchy</li>
+   *   <li>Uses {@link ClassAliasResolver} to resolve the state store factory 
class</li>
+   *   <li>Creates and returns the configured {@link DatasetStateStore} 
instance</li>
+   * </ol>
+   *
+   * <p>If state store is disabled, a no-op state store type will be used. 
Otherwise, the method
+   * looks for state store type in the following priority order:</p>
+   * <ol>
+   *   <li>{@link ConfigurationKeys#DATASET_STATE_STORE_TYPE_KEY}</li>
+   *   <li>{@link ConfigurationKeys#STATE_STORE_TYPE_KEY}</li>
+   *   <li>{@link ConfigurationKeys#DEFAULT_STATE_STORE_TYPE}</li>
+   * </ol>
+   *
+   * @param jobConfig the job configuration containing state store settings 
and type information.
+   *                  Must not be null.
+   * @return a configured {@link DatasetStateStore} instance ready for use
+   * @throws IOException if there's an error creating the state store, 
including:
+   *         <ul>
+   *           <li>Class resolution failures</li>
+   *           <li>Factory instantiation errors</li>
+   *           <li>State store creation failures</li>
+   *         </ul>
+   * @throws RuntimeException if there's a runtime error during state store 
initialization,
+   *         which will be logged and re-thrown
+   */
+  public static DatasetStateStore createStateStore(Config jobConfig)
+      throws IOException {
+    boolean stateStoreEnabled = 
!jobConfig.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) || jobConfig
+        .getBoolean(ConfigurationKeys.STATE_STORE_ENABLED);
+
+    String stateStoreType;
+
+    if (!stateStoreEnabled) {
+      stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP;
+    } else {
+      stateStoreType = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, ConfigUtils
+          .getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, 
ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
+    }
+
+    ClassAliasResolver<DatasetStateStore.Factory> resolver = new 
ClassAliasResolver<>(DatasetStateStore.Factory.class);
+
+    try {
+      DatasetStateStore.Factory stateStoreFactory = 
resolver.resolveClass(stateStoreType).newInstance();
+      return stateStoreFactory.createStateStore(jobConfig);
+    } catch (RuntimeException e) {
+      log.error("Error in initializing DataStateStore of type {} ", 
stateStoreType, e);
+      throw e;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index fdeaf42487..c62468305c 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -40,10 +40,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closer;
 import com.tdunning.math.stats.TDigest;
+import com.typesafe.config.ConfigFactory;
 import io.temporal.failure.ApplicationFailure;
 import io.temporal.activity.Activity;
 import io.temporal.activity.ActivityExecutionContext;
 
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.converter.initializer.ConverterInitializer;
@@ -53,7 +56,9 @@ import 
org.apache.gobblin.destination.DestinationDatasetHandlerService;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.CombinedWorkUnitAndDatasetStateGenerator;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
 import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -76,6 +81,7 @@ import 
org.apache.gobblin.writer.initializer.WriterInitializer;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
 
+
 @Slf4j
 public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
 
@@ -144,10 +150,16 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
     log.info("Using work dir root path for job '{}' - '{}'", 
jobState.getJobId(), workDirRoot);
 
-    // TODO: determine whether these are actually necessary to do (as 
MR/AbstractJobLauncher did)!
-    // SharedResourcesBroker<GobblinScopeTypes> jobBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
-    // jobState.setBroker(jobBroker);
-    // jobState.setWorkUnitAndDatasetStateFunctional(new 
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
+    /* Set up dataset state functional and shared resource broker on JobState 
to enable
+     work units to access previous dataset states and watermarks during work 
discovery, following MR launcher pattern*/
+    try {
+      addDatasetStateFunctionalAndSharedResourceBrokerToJobState(jobProps, 
jobState);
+    }
+    catch (IOException e){
+      String errMsg = "Failed to 
addDatasetStateFunctionalAndSharedResourceBrokerToJobState for job " + 
jobState.getJobId();
+      log.error(errMsg, e);
+      throw ApplicationFailure.newFailureWithCause(errMsg, "Failure: creating 
SharedResourcesBroker", e);
+    }
 
     AutomaticTroubleshooter troubleshooter = 
AutomaticTroubleshooterFactory.createForJob(jobProps);
     troubleshooter.start();
@@ -195,6 +207,13 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     }
   }
 
+  private void 
addDatasetStateFunctionalAndSharedResourceBrokerToJobState(Properties jobProps, 
JobState jobState) throws IOException {
+    SharedResourcesBroker<GobblinScopeTypes> jobBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
+    jobState.setBroker(jobBroker);
+    jobState.setWorkUnitAndDatasetStateFunctional(new 
CombinedWorkUnitAndDatasetStateGenerator(
+        
DatasetStateStoreUtils.createStateStore(ConfigFactory.parseProperties(jobProps)),
 jobState.getJobName()));
+  }
+
   protected WorkUnitsWithInsights 
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, 
EventSubmitterContext eventSubmitterContext, Closer closer)
       throws ReflectiveOperationException {
     // report (timer) metrics for "Work Discovery", *planning only* - NOT 
including WU prep, like serialization, `DestinationDatasetHandlerService`ing, 
etc.
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
index a7790c423d..5c457e6945 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
@@ -17,18 +17,35 @@
 
 package org.apache.gobblin.temporal.ddm.activity.impl;
 
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
 
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.CombinedWorkUnitAndDatasetStateGenerator;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
 
 
@@ -199,6 +216,119 @@ public class GenerateWorkUnitsImplTest {
     Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsMedianSize(), 0.0);
   }
 
+  @Test
+  public void testAddDatasetStateFunctionalAndSharedResourceBrokerToJobState() 
throws Exception {
+    // Arrange
+    Properties jobProps = new Properties();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+    JobState jobState = new JobState(jobProps);
+    SharedResourcesBroker<GobblinScopeTypes> mockBroker = 
mock(SharedResourcesBroker.class);
+    DatasetStateStore mockDatasetStateStore = mock(DatasetStateStore.class);
+
+    // Create instance and get access to the private method
+    GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
+    Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
+        "addDatasetStateFunctionalAndSharedResourceBrokerToJobState", 
Properties.class, JobState.class);
+    privateMethod.setAccessible(true);
+
+    // Mock static method calls
+    try (MockedStatic<JobStateUtils> mockedJobStateUtils = 
Mockito.mockStatic(JobStateUtils.class);
+         MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils = 
Mockito.mockStatic(DatasetStateStoreUtils.class)) {
+
+      mockedJobStateUtils.when(() -> 
JobStateUtils.getSharedResourcesBroker(jobState))
+          .thenReturn(mockBroker);
+      mockedDataStateStoreUtils.when(() -> 
DatasetStateStoreUtils.createStateStore(any()))
+          .thenReturn(mockDatasetStateStore);
+
+      // Act - invoke on the instance instead of null
+      privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);
+
+      // Assert
+      Assert.assertEquals(jobState.getBroker(), mockBroker, 
"SharedResourcesBroker should be set on JobState");
+      Assert.assertNotNull(jobState.getWorkUnitAndDatasetStateFunctional(), 
"WorkUnitAndDatasetStateFunctional should be set");
+      Assert.assertTrue(jobState.getWorkUnitAndDatasetStateFunctional() 
instanceof CombinedWorkUnitAndDatasetStateGenerator,
+          "WorkUnitAndDatasetStateFunctional should be instance of 
CombinedWorkUnitAndDatasetStateGenerator");
+
+      // Verify interactions
+      mockedJobStateUtils.verify(() -> 
JobStateUtils.getSharedResourcesBroker(jobState), times(1));
+      mockedDataStateStoreUtils.verify(() -> 
DatasetStateStoreUtils.createStateStore(any()), times(1));
+    }
+  }
+
+  @Test
+  public void 
testAddDatasetStateFunctionalAndSharedResourceBrokerToJobStateWithIOException() 
throws Exception {
+    // Arrange
+    Properties jobProps = new Properties();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+    JobState jobState = new JobState(jobProps);
+
+    // Create instance and get access to the private method
+    GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
+    Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
+        "addDatasetStateFunctionalAndSharedResourceBrokerToJobState", 
Properties.class, JobState.class);
+    privateMethod.setAccessible(true);
+
+    // Mock static method calls to throw IOException
+    try (MockedStatic<JobStateUtils> mockedJobStateUtils = 
Mockito.mockStatic(JobStateUtils.class);
+         MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils = 
Mockito.mockStatic(DatasetStateStoreUtils.class)) {
+
+      mockedDataStateStoreUtils.when(() -> 
DatasetStateStoreUtils.createStateStore(any()))
+          .thenThrow(new IOException("Failed to create state store"));
+
+      // Act & Assert
+      try {
+        privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);
+        Assert.fail("Expected IOException to be thrown");
+      } catch (InvocationTargetException e) {
+        Assert.assertTrue(e.getCause() instanceof IOException, "Root cause 
should be IOException");
+        Assert.assertEquals(e.getCause().getMessage(), "Failed to create state 
store");
+        // Verify broker was never set due to exception
+        Assert.assertNull(jobState.getBroker(), "Broker should not be set when 
exception occurs");
+        Assert.assertNull(jobState.getWorkUnitAndDatasetStateFunctional(),
+            "WorkUnitAndDatasetStateFunctional should not be set when 
exception occurs");
+      }
+    }
+  }
+
+  @Test
+  public void 
testAddDatasetStateFunctionalAndSharedResourceBrokerToJobStateWithNullBroker() 
throws Exception {
+    // Arrange
+    Properties jobProps = new Properties();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+    JobState jobState = new JobState(jobProps);
+    DatasetStateStore mockDatasetStateStore = mock(DatasetStateStore.class);
+
+    // Create instance and get access to the private method
+    GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
+    Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
+        "addDatasetStateFunctionalAndSharedResourceBrokerToJobState", 
Properties.class, JobState.class);
+    privateMethod.setAccessible(true);
+
+    // Mock static method calls - return null broker to test null handling
+    try (MockedStatic<JobStateUtils> mockedJobStateUtils = 
Mockito.mockStatic(JobStateUtils.class);
+         MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils = 
Mockito.mockStatic(DatasetStateStoreUtils.class)) {
+
+      mockedJobStateUtils.when(() -> 
JobStateUtils.getSharedResourcesBroker(jobState))
+          .thenReturn(null);
+      mockedDataStateStoreUtils.when(() -> 
DatasetStateStoreUtils.createStateStore(any()))
+          .thenReturn(mockDatasetStateStore);
+
+      // Act
+      privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);
+
+      // Assert
+      Assert.assertNull(jobState.getBroker(), "Broker should be null when null 
broker is returned");
+      Assert.assertNotNull(jobState.getWorkUnitAndDatasetStateFunctional(),
+          "WorkUnitAndDatasetStateFunctional should still be set even with 
null broker");
+    }
+  }
+
   public static WorkUnit createWorkUnitOfSize(long size) {
     WorkUnit workUnit = WorkUnit.createEmpty();
     workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);

Reply via email to