This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8ba6c1fc87 [GOBBLIN-2236] Remove dataset urn specific properties from
CommitStats (#4151)
8ba6c1fc87 is described below
commit 8ba6c1fc8721fd245101829b232530bc2ccecc7f
Author: abhishekmjain <[email protected]>
AuthorDate: Mon Nov 3 11:10:20 2025 +0530
[GOBBLIN-2236] Remove dataset urn specific properties from CommitStats
(#4151)
* Remove dataset urn specific properties from CommitStats and
ExecGobblinStats
---
.../ddm/activity/impl/CommitActivityImpl.java | 24 +++-
.../gobblin/temporal/ddm/work/CommitStats.java | 7 +-
.../temporal/ddm/work/ExecGobblinStats.java | 4 +-
.../ddm/workflow/impl/CommitStepWorkflowImpl.java | 21 ---
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 8 +-
.../ddm/activity/impl/CommitActivityImplTest.java | 158 +++++++++++++++++++++
.../gobblin/temporal/ddm/work/CommitStatsTest.java | 112 +++++++++++++++
.../temporal/ddm/work/ExecGobblinStatsTest.java | 120 ++++++++++++++++
.../workflow/impl/CommitStepWorkflowImplTest.java | 95 +++++++++++++
9 files changed, 519 insertions(+), 30 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index ba44dc6198..10b045232c 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -58,6 +58,8 @@ import
org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.initializer.Initializer;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.DatasetTaskSummary;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.SafeDatasetCommit;
@@ -75,6 +77,7 @@ import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -146,9 +149,19 @@ public class CommitActivityImpl implements CommitActivity {
boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext,
shouldIncludeFailedTasks);
+
+ // Emit Job Summary event
+ if (!optFailure.isPresent() || !datasetTaskSummaries.isEmpty()) {
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinActivityFactory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+ .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES,
convertDatasetStatsToTaskSummaries(datasetTaskSummaries))
+ .submit();// emit job summary info on both full and partial commit
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
+ }
+
return new CommitStats(
- datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
+
datasetTaskSummaries.values().stream().mapToLong(DatasetStats::getRecordsWritten).sum(),
+
datasetTaskSummaries.values().stream().mapToLong(DatasetStats::getBytesWritten).sum(),
optFailure
);
} catch (Exception e) {
@@ -290,4 +303,13 @@ public class CommitActivityImpl implements CommitActivity {
private static WorkUnitStream createEmptyWorkUnitStream() {
return new BasicWorkUnitStream.Builder(Lists.newArrayList()).build();
}
+
+ private List<DatasetTaskSummary>
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
+ List<DatasetTaskSummary> datasetTaskSummaries = Lists.newArrayList();
+ for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
+ datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(),
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(),
entry.getValue().isSuccessfullyCommitted(),
entry.getValue().getDataQualityCheckStatus()));
+ }
+ log.info("Converted dataset stats to task summaries: {}",
datasetTaskSummaries);
+ return datasetTaskSummaries;
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
index 3c150689d4..936fc828c4 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -17,8 +17,6 @@
package org.apache.gobblin.temporal.ddm.work;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Optional;
import lombok.AccessLevel;
@@ -41,11 +39,12 @@ import
org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class CommitStats {
- @NonNull private Map<String, DatasetStats> datasetStats;
@NonNull private int numCommittedWorkUnits;
+ @NonNull private long recordsWritten;
+ @NonNull private long bytesWritten;
@NonNull private Optional<FailedDatasetUrnsException> optFailure;
public static CommitStats createEmpty() {
- return new CommitStats(new HashMap<>(), 0, Optional.empty());
+ return new CommitStats(0, 0, 0, Optional.empty());
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
index 89beebf153..a9cf4c1114 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.temporal.ddm.work;
-import java.util.Map;
import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -34,7 +33,8 @@ import lombok.Setter;
public class ExecGobblinStats {
@NonNull private int numWorkUnits;
@NonNull private int numCommitted;
+ @NonNull private long recordsWritten;
+ @NonNull private long bytesWritten;
@NonNull private String user;
- @NonNull private Map<String, DatasetStats> stats;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 56f645fc49..50d587c4ef 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -17,9 +17,6 @@
package org.apache.gobblin.temporal.ddm.workflow.impl;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
import io.temporal.failure.ApplicationFailure;
@@ -27,15 +24,11 @@ import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.DatasetTaskSummary;
import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
-import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
-import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
@Slf4j
@@ -45,12 +38,6 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
public CommitStats commit(WUProcessingSpec workSpec, final Properties props)
{
final CommitActivity activityStub =
Workflow.newActivityStub(CommitActivity.class,
ActivityType.COMMIT.buildActivityOptions(props, true));
CommitStats commitGobblinStats = activityStub.commit(workSpec);
- if (!commitGobblinStats.getOptFailure().isPresent() ||
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext(),
props);
- timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
- .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES,
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
- .submit();// emit job summary info on both full and partial commit
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
- }
if (commitGobblinStats.getOptFailure().isPresent()) {
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed to commit dataset state for some dataset(s)"),
commitGobblinStats.getOptFailure().get().getClass().toString(),
@@ -58,12 +45,4 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
}
return commitGobblinStats;
}
- private List<DatasetTaskSummary>
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
- List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
- for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
- datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(),
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(),
entry.getValue().isSuccessfullyCommitted(),
entry.getValue().getDataQualityCheckStatus()));
- }
- log.info("Converted dataset stats to task summaries: {}",
datasetTaskSummaries);
- return datasetTaskSummaries;
- }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index f1b4444d3b..05fa2a8546 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -102,6 +102,8 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
WorkUnitsSizeSummary wuSizeSummary =
generateWorkUnitResult.getWorkUnitsSizeSummary();
int numWUsGenerated =
safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
int numWUsCommitted = 0;
+ long recordsWritten = 0;
+ long bytesWritten = 0;
CommitStats commitStats = CommitStats.createEmpty();
if (numWUsGenerated > 0) {
TimeBudget timeBudget =
calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps);
@@ -124,11 +126,13 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
commitStats = processWUsWorkflow.process(wuSpec, temporalJobProps);
numWUsCommitted = commitStats.getNumCommittedWorkUnits();
+ recordsWritten = commitStats.getRecordsWritten();
+ bytesWritten = commitStats.getBytesWritten();
}
jobSuccessTimer.stop();
isSuccessful = true;
- return new ExecGobblinStats(numWUsGenerated, numWUsCommitted,
jobProps.getProperty(Help.USER_TO_PROXY_KEY),
- commitStats.getDatasetStats());
+ return new ExecGobblinStats(numWUsGenerated, numWUsCommitted,
recordsWritten, bytesWritten,
+ jobProps.getProperty(Help.USER_TO_PROXY_KEY));
} catch (Exception e) {
// Emit a failed GobblinTrackingEvent to record job failures
timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); //
update GaaS: `ExecutionStatus.FAILED`; `TimingEvent.JOB_END_TIME`
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImplTest.java
new file mode 100644
index 0000000000..63d6aa1dc1
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImplTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.runtime.DatasetTaskSummary;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
+
+
+/**
+ * Tests for {@link CommitActivityImpl} to verify the job summary event
emission
+ * and aggregate stats calculation.
+ */
+public class CommitActivityImplTest {
+
+ @Test
+ public void testConvertDatasetStatsToTaskSummaries() throws Exception {
+ CommitActivityImpl commitActivity = new CommitActivityImpl();
+
+ // Create test dataset stats
+ Map<String, DatasetStats> datasetStats = new HashMap<>();
+ datasetStats.put("dataset1", new DatasetStats(1000L, 2000L, true, 10,
"PASS"));
+ datasetStats.put("dataset2", new DatasetStats(3000L, 4000L, true, 20,
"PASS"));
+ datasetStats.put("dataset3", new DatasetStats(5000L, 6000L, false, 0,
"FAIL"));
+
+ // Use reflection to call the private method
+ java.lang.reflect.Method method =
CommitActivityImpl.class.getDeclaredMethod(
+ "convertDatasetStatsToTaskSummaries", Map.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ List<DatasetTaskSummary> result = (List<DatasetTaskSummary>)
method.invoke(commitActivity, datasetStats);
+
+ // Verify the conversion
+ Assert.assertEquals(3, result.size());
+
+ // Verify each dataset was converted correctly
+ Map<String, DatasetTaskSummary> summaryMap = new HashMap<>();
+ for (DatasetTaskSummary summary : result) {
+ summaryMap.put(summary.getDatasetUrn(), summary);
+ }
+
+ Assert.assertTrue(summaryMap.containsKey("dataset1"));
+ Assert.assertEquals(1000L, summaryMap.get("dataset1").getRecordsWritten());
+ Assert.assertEquals(2000L, summaryMap.get("dataset1").getBytesWritten());
+ Assert.assertTrue(summaryMap.get("dataset1").isSuccessfullyCommitted());
+ Assert.assertEquals("PASS",
summaryMap.get("dataset1").getDataQualityStatus());
+
+ Assert.assertTrue(summaryMap.containsKey("dataset2"));
+ Assert.assertEquals(3000L, summaryMap.get("dataset2").getRecordsWritten());
+ Assert.assertEquals(4000L, summaryMap.get("dataset2").getBytesWritten());
+ Assert.assertTrue(summaryMap.get("dataset2").isSuccessfullyCommitted());
+
+ Assert.assertTrue(summaryMap.containsKey("dataset3"));
+ Assert.assertEquals(5000L, summaryMap.get("dataset3").getRecordsWritten());
+ Assert.assertEquals(6000L, summaryMap.get("dataset3").getBytesWritten());
+ Assert.assertFalse(summaryMap.get("dataset3").isSuccessfullyCommitted());
+ Assert.assertEquals("FAIL",
summaryMap.get("dataset3").getDataQualityStatus());
+ }
+
+ @Test
+ public void testConvertDatasetStatsToTaskSummariesWithEmptyMap() throws
Exception {
+ CommitActivityImpl commitActivity = new CommitActivityImpl();
+
+ Map<String, DatasetStats> emptyDatasetStats = new HashMap<>();
+
+ // Use reflection to call the private method
+ java.lang.reflect.Method method =
CommitActivityImpl.class.getDeclaredMethod(
+ "convertDatasetStatsToTaskSummaries", Map.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ List<DatasetTaskSummary> result = (List<DatasetTaskSummary>)
method.invoke(commitActivity, emptyDatasetStats);
+
+ Assert.assertNotNull(result);
+ Assert.assertEquals(0, result.size());
+ }
+
+ @Test
+ public void testConvertDatasetStatsToTaskSummariesPreservesAllFields()
throws Exception {
+ CommitActivityImpl commitActivity = new CommitActivityImpl();
+
+ // Create test dataset stats with specific values
+ Map<String, DatasetStats> datasetStats = new HashMap<>();
+ String datasetUrn = "testDataset";
+ long recordsWritten = 123456L;
+ long bytesWritten = 789012L;
+ boolean successfullyCommitted = true;
+ int numCommittedWorkunits = 42;
+ String dataQualityStatus = "PASS";
+
+ datasetStats.put(datasetUrn, new DatasetStats(
+ recordsWritten, bytesWritten, successfullyCommitted,
numCommittedWorkunits, dataQualityStatus));
+
+ // Use reflection to call the private method
+ java.lang.reflect.Method method =
CommitActivityImpl.class.getDeclaredMethod(
+ "convertDatasetStatsToTaskSummaries", Map.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ List<DatasetTaskSummary> result = (List<DatasetTaskSummary>)
method.invoke(commitActivity, datasetStats);
+
+ Assert.assertEquals(1, result.size());
+ DatasetTaskSummary summary = result.get(0);
+
+ // Verify all fields are preserved
+ Assert.assertEquals(datasetUrn, summary.getDatasetUrn());
+ Assert.assertEquals(recordsWritten, summary.getRecordsWritten());
+ Assert.assertEquals(bytesWritten, summary.getBytesWritten());
+ Assert.assertEquals(successfullyCommitted,
summary.isSuccessfullyCommitted());
+ Assert.assertEquals(dataQualityStatus, summary.getDataQualityStatus());
+ }
+
+ @Test
+ public void testAggregateStatsCalculation() {
+ // This test verifies the aggregate calculation logic used in
CommitActivityImpl
+ Map<String, DatasetStats> datasetStats = new HashMap<>();
+ datasetStats.put("dataset1", new DatasetStats(1000L, 2000L, true, 10,
"PASS"));
+ datasetStats.put("dataset2", new DatasetStats(3000L, 4000L, true, 20,
"PASS"));
+ datasetStats.put("dataset3", new DatasetStats(5000L, 6000L, true, 30,
"PASS"));
+
+ // Calculate aggregates as done in CommitActivityImpl
+ int totalCommittedWorkUnits = datasetStats.values().stream()
+ .mapToInt(DatasetStats::getNumCommittedWorkunits)
+ .sum();
+ long totalRecordsWritten = datasetStats.values().stream()
+ .mapToLong(DatasetStats::getRecordsWritten)
+ .sum();
+ long totalBytesWritten = datasetStats.values().stream()
+ .mapToLong(DatasetStats::getBytesWritten)
+ .sum();
+
+ Assert.assertEquals(60, totalCommittedWorkUnits);
+ Assert.assertEquals(9000L, totalRecordsWritten);
+ Assert.assertEquals(12000L, totalBytesWritten);
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/CommitStatsTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/CommitStatsTest.java
new file mode 100644
index 0000000000..0040637dab
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/CommitStatsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.util.Optional;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
+
+
+/**
+ * Tests for {@link CommitStats} to verify the structure no longer contains
large dataset maps
+ * and only contains aggregate metrics to avoid Temporal payload size limits.
+ */
+public class CommitStatsTest {
+
+ @Test
+ public void testCommitStatsWithAggregateValues() {
+ int numCommittedWorkUnits = 100;
+ long recordsWritten = 50000L;
+ long bytesWritten = 1024000L;
+ Optional<FailedDatasetUrnsException> optFailure = Optional.empty();
+
+ CommitStats stats = new CommitStats(numCommittedWorkUnits, recordsWritten,
bytesWritten, optFailure);
+
+ Assert.assertEquals(numCommittedWorkUnits,
stats.getNumCommittedWorkUnits());
+ Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+ Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+ Assert.assertFalse(stats.getOptFailure().isPresent());
+ }
+
+ @Test
+ public void testCommitStatsWithFailure() {
+ int numCommittedWorkUnits = 50;
+ long recordsWritten = 25000L;
+ long bytesWritten = 512000L;
+ FailedDatasetUrnsException failure = new
FailedDatasetUrnsException(java.util.Collections.singleton("dataset1"));
+ Optional<FailedDatasetUrnsException> optFailure = Optional.of(failure);
+
+ CommitStats stats = new CommitStats(numCommittedWorkUnits, recordsWritten,
bytesWritten, optFailure);
+
+ Assert.assertEquals(numCommittedWorkUnits,
stats.getNumCommittedWorkUnits());
+ Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+ Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+ Assert.assertTrue(stats.getOptFailure().isPresent());
+ Assert.assertEquals(failure, stats.getOptFailure().get());
+ }
+
+ @Test
+ public void testCreateEmpty() {
+ CommitStats emptyStats = CommitStats.createEmpty();
+
+ Assert.assertEquals(0, emptyStats.getNumCommittedWorkUnits());
+ Assert.assertEquals(0L, emptyStats.getRecordsWritten());
+ Assert.assertEquals(0L, emptyStats.getBytesWritten());
+ Assert.assertFalse(emptyStats.getOptFailure().isPresent());
+ }
+
+ @Test
+ public void testCommitStatsDoesNotContainDatasetStatsMap() {
+ // Verify that CommitStats no longer has a datasetStats field
+ // This is important to avoid exceeding Temporal's 2MB payload size limit
+ CommitStats stats = new CommitStats(10, 1000L, 2000L, Optional.empty());
+
+ // Verify only the expected fields exist
+ Assert.assertEquals(10, stats.getNumCommittedWorkUnits());
+ Assert.assertEquals(1000L, stats.getRecordsWritten());
+ Assert.assertEquals(2000L, stats.getBytesWritten());
+ Assert.assertFalse(stats.getOptFailure().isPresent());
+ }
+
+ @Test
+ public void testCommitStatsWithZeroValues() {
+ CommitStats stats = new CommitStats(0, 0L, 0L, Optional.empty());
+
+ Assert.assertEquals(0, stats.getNumCommittedWorkUnits());
+ Assert.assertEquals(0L, stats.getRecordsWritten());
+ Assert.assertEquals(0L, stats.getBytesWritten());
+ Assert.assertFalse(stats.getOptFailure().isPresent());
+ }
+
+ @Test
+ public void testCommitStatsWithLargeValues() {
+ // Test with large values to ensure no overflow issues
+ int numCommittedWorkUnits = Integer.MAX_VALUE;
+ long recordsWritten = Long.MAX_VALUE / 2;
+ long bytesWritten = Long.MAX_VALUE / 2;
+
+ CommitStats stats = new CommitStats(numCommittedWorkUnits, recordsWritten,
bytesWritten, Optional.empty());
+
+ Assert.assertEquals(numCommittedWorkUnits,
stats.getNumCommittedWorkUnits());
+ Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+ Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStatsTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStatsTest.java
new file mode 100644
index 0000000000..09895dedf5
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStatsTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link ExecGobblinStats} to verify the structure no longer
contains large dataset maps
+ * and only contains aggregate metrics to avoid Temporal payload size limits.
+ */
+public class ExecGobblinStatsTest {
+
+ @Test
+ public void testExecGobblinStatsWithAggregateValues() {
+ int numWorkUnits = 200;
+ int numCommitted = 195;
+ long recordsWritten = 100000L;
+ long bytesWritten = 2048000L;
+ String user = "testUser";
+
+ ExecGobblinStats stats = new ExecGobblinStats(numWorkUnits, numCommitted,
recordsWritten, bytesWritten, user);
+
+ Assert.assertEquals(numWorkUnits, stats.getNumWorkUnits());
+ Assert.assertEquals(numCommitted, stats.getNumCommitted());
+ Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+ Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+ Assert.assertEquals(user, stats.getUser());
+ }
+
+ @Test
+ public void testExecGobblinStatsDoesNotContainDatasetStatsMap() {
+ // Verify that ExecGobblinStats no longer has a stats (Map<String,
DatasetStats>) field
+ // This is important to avoid exceeding Temporal's 2MB payload size limit
+ ExecGobblinStats stats = new ExecGobblinStats(10, 10, 1000L, 2000L,
"user1");
+
+ // Verify only the expected fields exist
+ Assert.assertEquals(10, stats.getNumWorkUnits());
+ Assert.assertEquals(10, stats.getNumCommitted());
+ Assert.assertEquals(1000L, stats.getRecordsWritten());
+ Assert.assertEquals(2000L, stats.getBytesWritten());
+ Assert.assertEquals("user1", stats.getUser());
+ }
+
+ @Test
+ public void testExecGobblinStatsWithPartialCommit() {
+ // Test scenario where not all work units were committed
+ int numWorkUnits = 100;
+ int numCommitted = 75;
+ long recordsWritten = 50000L;
+ long bytesWritten = 1024000L;
+ String user = "partialUser";
+
+ ExecGobblinStats stats = new ExecGobblinStats(numWorkUnits, numCommitted,
recordsWritten, bytesWritten, user);
+
+ Assert.assertEquals(numWorkUnits, stats.getNumWorkUnits());
+ Assert.assertEquals(numCommitted, stats.getNumCommitted());
+ Assert.assertTrue(stats.getNumCommitted() < stats.getNumWorkUnits());
+ Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+ Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+ }
+
+ @Test
+ public void testExecGobblinStatsWithZeroValues() {
+ ExecGobblinStats stats = new ExecGobblinStats(0, 0, 0L, 0L, "emptyUser");
+
+ Assert.assertEquals(0, stats.getNumWorkUnits());
+ Assert.assertEquals(0, stats.getNumCommitted());
+ Assert.assertEquals(0L, stats.getRecordsWritten());
+ Assert.assertEquals(0L, stats.getBytesWritten());
+ Assert.assertEquals("emptyUser", stats.getUser());
+ }
+
+ @Test
+ public void testExecGobblinStatsWithLargeValues() {
+ // Test with large values to ensure no overflow issues
+ int numWorkUnits = Integer.MAX_VALUE;
+ int numCommitted = Integer.MAX_VALUE - 1;
+ long recordsWritten = Long.MAX_VALUE / 2;
+ long bytesWritten = Long.MAX_VALUE / 2;
+ String user = "largeUser";
+
+ ExecGobblinStats stats = new ExecGobblinStats(numWorkUnits, numCommitted,
recordsWritten, bytesWritten, user);
+
+ Assert.assertEquals(numWorkUnits, stats.getNumWorkUnits());
+ Assert.assertEquals(numCommitted, stats.getNumCommitted());
+ Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+ Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+ Assert.assertEquals(user, stats.getUser());
+ }
+
+ @Test
+ public void testExecGobblinStatsConstructorFieldOrder() {
+ // Verify the constructor field order matches the new structure
+ ExecGobblinStats stats = new ExecGobblinStats(100, 95, 50000L, 1024000L,
"orderUser");
+
+ // Verify all fields are set correctly in the expected order
+ Assert.assertEquals(100, stats.getNumWorkUnits());
+ Assert.assertEquals(95, stats.getNumCommitted());
+ Assert.assertEquals(50000L, stats.getRecordsWritten());
+ Assert.assertEquals(1024000L, stats.getBytesWritten());
+ Assert.assertEquals("orderUser", stats.getUser());
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImplTest.java
new file mode 100644
index 0000000000..5549749bcb
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImplTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.lang.reflect.Method;
+import java.util.Optional;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+
+
+/**
+ * Tests for {@link CommitStepWorkflowImpl} to verify that job summary event
emission
+ * has been moved to CommitActivity to avoid Temporal payload size limits.
+ */
+public class CommitStepWorkflowImplTest {
+
+ @Test
+ public void testCommitStepWorkflowDoesNotConvertDatasetStats() throws
Exception {
+ // Verify that CommitStepWorkflowImpl no longer has the
convertDatasetStatsToTaskSummaries method
+ // This method should only exist in CommitActivityImpl now
+
+ CommitStepWorkflowImpl workflow = new CommitStepWorkflowImpl();
+ Method[] methods = CommitStepWorkflowImpl.class.getDeclaredMethods();
+
+ boolean hasConvertMethod = false;
+ for (Method method : methods) {
+ if (method.getName().equals("convertDatasetStatsToTaskSummaries")) {
+ hasConvertMethod = true;
+ break;
+ }
+ }
+
+ Assert.assertFalse(hasConvertMethod,
+ "CommitStepWorkflowImpl should not have
convertDatasetStatsToTaskSummaries method. " +
+ "Job summary emission should be done in CommitActivity to avoid
payload size issues.");
+ }
+
+ @Test
+ public void testCommitStepWorkflowSimplifiedStructure() {
+ // Verify that CommitStepWorkflowImpl has a simplified structure
+ // It should only have the commit method and no job summary emission logic
+
+ CommitStepWorkflowImpl workflow = new CommitStepWorkflowImpl();
+ Method[] methods = CommitStepWorkflowImpl.class.getDeclaredMethods();
+
+ // Count non-inherited methods (excluding synthetic methods)
+ int methodCount = 0;
+ for (Method method : methods) {
+ if (!method.isSynthetic()) {
+ methodCount++;
+ }
+ }
+
+ // Should only have the commit method
+ Assert.assertTrue(methodCount <= 1,
+ "CommitStepWorkflowImpl should have minimal methods. Found: " +
methodCount);
+ }
+
+ @Test
+ public void testCommitStatsStructureForPayloadSizeOptimization() {
+ // Verify that CommitStats returned by the workflow has aggregate values
only
+ // This ensures the payload size stays under Temporal's 2MB limit
+
+ CommitStats stats = new CommitStats(100, 50000L, 1024000L,
Optional.empty());
+
+ // Verify it has aggregate values
+ Assert.assertEquals(100, stats.getNumCommittedWorkUnits());
+ Assert.assertEquals(50000L, stats.getRecordsWritten());
+ Assert.assertEquals(1024000L, stats.getBytesWritten());
+
+ // Verify the object is small (should be under 1KB for aggregate values)
+ // This is a proxy test to ensure we're not serializing large maps
+ String statsString = stats.toString();
+ Assert.assertTrue(statsString.length() < 1000,
+ "CommitStats toString should be small. Length: " +
statsString.length());
+ }
+}