This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 20731d542a [GOBBLIN-2229] Enable access to Dataset state store during
work discovery (#4144)
20731d542a is described below
commit 20731d542a467750dcbb5b1e1c45d63ff410c6b5
Author: pratapaditya04 <[email protected]>
AuthorDate: Fri Sep 26 12:45:43 2025 +0530
[GOBBLIN-2229] Enable access to Dataset state store during work discovery
(#4144)
Enable access to dataset state store during work discovery
---
.../org/apache/gobblin/runtime/JobContext.java | 25 +---
.../runtime/util/DatasetStateStoreUtils.java | 101 ++++++++++++++++
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 27 ++++-
.../activity/impl/GenerateWorkUnitsImplTest.java | 130 +++++++++++++++++++++
4 files changed, 256 insertions(+), 27 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index e591ad6859..9f7c918fbe 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -65,10 +65,10 @@ import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.runtime.JobState.DatasetState;
import org.apache.gobblin.runtime.commit.FsCommitSequenceStore;
import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
+import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
@@ -192,28 +192,7 @@ public class JobContext implements Closeable {
protected DatasetStateStore createStateStore(Config jobConfig)
throws IOException {
- boolean stateStoreEnabled =
!jobConfig.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) || jobConfig
- .getBoolean(ConfigurationKeys.STATE_STORE_ENABLED);
-
- String stateStoreType;
-
- if (!stateStoreEnabled) {
- stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP;
- } else {
- stateStoreType = ConfigUtils.getString(jobConfig,
ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, ConfigUtils
- .getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
- }
-
- ClassAliasResolver<DatasetStateStore.Factory> resolver = new
ClassAliasResolver<>(DatasetStateStore.Factory.class);
-
- try {
- DatasetStateStore.Factory stateStoreFactory =
resolver.resolveClass(stateStoreType).newInstance();
- return stateStoreFactory.createStateStore(jobConfig);
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException(e);
- }
+ return DatasetStateStoreUtils.createStateStore(jobConfig);
}
protected Optional<JobHistoryStore> createJobHistoryStore(Properties
jobProps) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/DatasetStateStoreUtils.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/DatasetStateStoreUtils.java
new file mode 100644
index 0000000000..d6b71f3e94
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/DatasetStateStoreUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.util;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Utilities for creating and managing {@link DatasetStateStore} instances
based on job configuration.
+ */
+@Slf4j
+public class DatasetStateStoreUtils {
+
+ /**
+ * Private constructor to prevent instantiation of this utility class.
+ */
+ private DatasetStateStoreUtils() {
+ // Utility class should not be instantiated
+ }
+
+ /**
+ * Creates a {@link DatasetStateStore} instance based on the provided job
configuration.
+ *
+ * <p>This method performs the following operations:</p>
+ * <ol>
+ * <li>Checks if state store is enabled via {@link
ConfigurationKeys#STATE_STORE_ENABLED}</li>
+ * <li>Determines the appropriate state store type from configuration
hierarchy</li>
+ * <li>Uses {@link ClassAliasResolver} to resolve the state store factory
class</li>
+ * <li>Creates and returns the configured {@link DatasetStateStore}
instance</li>
+ * </ol>
+ *
+ * <p>If state store is disabled, a no-op state store type will be used.
Otherwise, the method
+ * looks for state store type in the following priority order:</p>
+ * <ol>
+ * <li>{@link ConfigurationKeys#DATASET_STATE_STORE_TYPE_KEY}</li>
+ * <li>{@link ConfigurationKeys#STATE_STORE_TYPE_KEY}</li>
+ * <li>{@link ConfigurationKeys#DEFAULT_STATE_STORE_TYPE}</li>
+ * </ol>
+ *
+ * @param jobConfig the job configuration containing state store settings
and type information.
+ * Must not be null.
+ * @return a configured {@link DatasetStateStore} instance ready for use
+ * @throws IOException if there's an error creating the state store,
including:
+ * <ul>
+ * <li>Class resolution failures</li>
+ * <li>Factory instantiation errors</li>
+ * <li>State store creation failures</li>
+ * </ul>
+ * @throws RuntimeException if there's a runtime error during state store
initialization,
+ * which will be logged and re-thrown
+ */
+ public static DatasetStateStore createStateStore(Config jobConfig)
+ throws IOException {
+ boolean stateStoreEnabled =
!jobConfig.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) || jobConfig
+ .getBoolean(ConfigurationKeys.STATE_STORE_ENABLED);
+
+ String stateStoreType;
+
+ if (!stateStoreEnabled) {
+ stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP;
+ } else {
+ stateStoreType = ConfigUtils.getString(jobConfig,
ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, ConfigUtils
+ .getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
+ }
+
+ ClassAliasResolver<DatasetStateStore.Factory> resolver = new
ClassAliasResolver<>(DatasetStateStore.Factory.class);
+
+ try {
+ DatasetStateStore.Factory stateStoreFactory =
resolver.resolveClass(stateStoreType).newInstance();
+ return stateStoreFactory.createStateStore(jobConfig);
+ } catch (RuntimeException e) {
+ log.error("Error in initializing DataStateStore of type {} ",
stateStoreType, e);
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index fdeaf42487..c62468305c 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -40,10 +40,13 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.tdunning.math.stats.TDigest;
+import com.typesafe.config.ConfigFactory;
import io.temporal.failure.ApplicationFailure;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.converter.initializer.ConverterInitializer;
@@ -53,7 +56,9 @@ import
org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.CombinedWorkUnitAndDatasetStateGenerator;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
@@ -76,6 +81,7 @@ import
org.apache.gobblin.writer.initializer.WriterInitializer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
+
@Slf4j
public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
@@ -144,10 +150,16 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
log.info("Using work dir root path for job '{}' - '{}'",
jobState.getJobId(), workDirRoot);
- // TODO: determine whether these are actually necessary to do (as
MR/AbstractJobLauncher did)!
- // SharedResourcesBroker<GobblinScopeTypes> jobBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
- // jobState.setBroker(jobBroker);
- // jobState.setWorkUnitAndDatasetStateFunctional(new
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
+ /* Set up dataset state functional and shared resource broker on JobState
to enable
+ work units to access previous dataset states and watermarks during work
discovery, following MR launcher pattern*/
+ try {
+ addDatasetStateFunctionalAndSharedResourceBrokerToJobState(jobProps,
jobState);
+ }
+ catch (IOException e){
+ String errMsg = "Failed to
addDatasetStateFunctionalAndSharedResourceBrokerToJobState for job " +
jobState.getJobId();
+ log.error(errMsg, e);
+ throw ApplicationFailure.newFailureWithCause(errMsg, "Failure: creating
SharedResourcesBroker", e);
+ }
AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(jobProps);
troubleshooter.start();
@@ -195,6 +207,13 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
}
}
+ private void
addDatasetStateFunctionalAndSharedResourceBrokerToJobState(Properties jobProps,
JobState jobState) throws IOException {
+ SharedResourcesBroker<GobblinScopeTypes> jobBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ jobState.setBroker(jobBroker);
+ jobState.setWorkUnitAndDatasetStateFunctional(new
CombinedWorkUnitAndDatasetStateGenerator(
+
DatasetStateStoreUtils.createStateStore(ConfigFactory.parseProperties(jobProps)),
jobState.getJobName()));
+ }
+
protected WorkUnitsWithInsights
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState,
EventSubmitterContext eventSubmitterContext, Closer closer)
throws ReflectiveOperationException {
// report (timer) metrics for "Work Discovery", *planning only* - NOT
including WU prep, like serialization, `DestinationDatasetHandlerService`ing,
etc.
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
index a7790c423d..5c457e6945 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
@@ -17,18 +17,35 @@
package org.apache.gobblin.temporal.ddm.activity.impl;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import org.testng.Assert;
import org.testng.annotations.Test;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.CombinedWorkUnitAndDatasetStateGenerator;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.DatasetStateStoreUtils;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
@@ -199,6 +216,119 @@ public class GenerateWorkUnitsImplTest {
Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsMedianSize(), 0.0);
}
+ @Test
+ public void testAddDatasetStateFunctionalAndSharedResourceBrokerToJobState()
throws Exception {
+ // Arrange
+ Properties jobProps = new Properties();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+ JobState jobState = new JobState(jobProps);
+ SharedResourcesBroker<GobblinScopeTypes> mockBroker =
mock(SharedResourcesBroker.class);
+ DatasetStateStore mockDatasetStateStore = mock(DatasetStateStore.class);
+
+ // Create instance and get access to the private method
+ GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
+ Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
+ "addDatasetStateFunctionalAndSharedResourceBrokerToJobState",
Properties.class, JobState.class);
+ privateMethod.setAccessible(true);
+
+ // Mock static method calls
+ try (MockedStatic<JobStateUtils> mockedJobStateUtils =
Mockito.mockStatic(JobStateUtils.class);
+ MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils =
Mockito.mockStatic(DatasetStateStoreUtils.class)) {
+
+ mockedJobStateUtils.when(() ->
JobStateUtils.getSharedResourcesBroker(jobState))
+ .thenReturn(mockBroker);
+ mockedDataStateStoreUtils.when(() ->
DatasetStateStoreUtils.createStateStore(any()))
+ .thenReturn(mockDatasetStateStore);
+
+ // Act - invoke on the instance instead of null
+ privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);
+
+ // Assert
+ Assert.assertEquals(jobState.getBroker(), mockBroker,
"SharedResourcesBroker should be set on JobState");
+ Assert.assertNotNull(jobState.getWorkUnitAndDatasetStateFunctional(),
"WorkUnitAndDatasetStateFunctional should be set");
+ Assert.assertTrue(jobState.getWorkUnitAndDatasetStateFunctional()
instanceof CombinedWorkUnitAndDatasetStateGenerator,
+ "WorkUnitAndDatasetStateFunctional should be instance of
CombinedWorkUnitAndDatasetStateGenerator");
+
+ // Verify interactions
+ mockedJobStateUtils.verify(() ->
JobStateUtils.getSharedResourcesBroker(jobState), times(1));
+ mockedDataStateStoreUtils.verify(() ->
DatasetStateStoreUtils.createStateStore(any()), times(1));
+ }
+ }
+
+ @Test
+ public void
testAddDatasetStateFunctionalAndSharedResourceBrokerToJobStateWithIOException()
throws Exception {
+ // Arrange
+ Properties jobProps = new Properties();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+ JobState jobState = new JobState(jobProps);
+
+ // Create instance and get access to the private method
+ GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
+ Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
+ "addDatasetStateFunctionalAndSharedResourceBrokerToJobState",
Properties.class, JobState.class);
+ privateMethod.setAccessible(true);
+
+ // Mock static method calls to throw IOException
+ try (MockedStatic<JobStateUtils> mockedJobStateUtils =
Mockito.mockStatic(JobStateUtils.class);
+ MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils =
Mockito.mockStatic(DatasetStateStoreUtils.class)) {
+
+ mockedDataStateStoreUtils.when(() ->
DatasetStateStoreUtils.createStateStore(any()))
+ .thenThrow(new IOException("Failed to create state store"));
+
+ // Act & Assert
+ try {
+ privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);
+ Assert.fail("Expected IOException to be thrown");
+ } catch (InvocationTargetException e) {
+ Assert.assertTrue(e.getCause() instanceof IOException, "Root cause
should be IOException");
+ Assert.assertEquals(e.getCause().getMessage(), "Failed to create state
store");
+ // Verify broker was never set due to exception
+ Assert.assertNull(jobState.getBroker(), "Broker should not be set when
exception occurs");
+ Assert.assertNull(jobState.getWorkUnitAndDatasetStateFunctional(),
+ "WorkUnitAndDatasetStateFunctional should not be set when
exception occurs");
+ }
+ }
+ }
+
+ @Test
+ public void
testAddDatasetStateFunctionalAndSharedResourceBrokerToJobStateWithNullBroker()
throws Exception {
+ // Arrange
+ Properties jobProps = new Properties();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test-job");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "test-job-id");
+
+ JobState jobState = new JobState(jobProps);
+ DatasetStateStore mockDatasetStateStore = mock(DatasetStateStore.class);
+
+ // Create instance and get access to the private method
+ GenerateWorkUnitsImpl generateWorkUnitsImpl = new GenerateWorkUnitsImpl();
+ Method privateMethod = GenerateWorkUnitsImpl.class.getDeclaredMethod(
+ "addDatasetStateFunctionalAndSharedResourceBrokerToJobState",
Properties.class, JobState.class);
+ privateMethod.setAccessible(true);
+
+ // Mock static method calls - return null broker to test null handling
+ try (MockedStatic<JobStateUtils> mockedJobStateUtils =
Mockito.mockStatic(JobStateUtils.class);
+ MockedStatic<DatasetStateStoreUtils> mockedDataStateStoreUtils =
Mockito.mockStatic(DatasetStateStoreUtils.class)) {
+
+ mockedJobStateUtils.when(() ->
JobStateUtils.getSharedResourcesBroker(jobState))
+ .thenReturn(null);
+ mockedDataStateStoreUtils.when(() ->
DatasetStateStoreUtils.createStateStore(any()))
+ .thenReturn(mockDatasetStateStore);
+
+ // Act
+ privateMethod.invoke(generateWorkUnitsImpl, jobProps, jobState);
+
+ // Assert
+ Assert.assertNull(jobState.getBroker(), "Broker should be null when null
broker is returned");
+ Assert.assertNotNull(jobState.getWorkUnitAndDatasetStateFunctional(),
+ "WorkUnitAndDatasetStateFunctional should still be set even with
null broker");
+ }
+ }
+
public static WorkUnit createWorkUnitOfSize(long size) {
WorkUnit workUnit = WorkUnit.createEmpty();
workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);