This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ba6c1fc87 [GOBBLIN-2236] Remove dataset urn specific properties from 
CommitStats (#4151)
8ba6c1fc87 is described below

commit 8ba6c1fc8721fd245101829b232530bc2ccecc7f
Author: abhishekmjain <[email protected]>
AuthorDate: Mon Nov 3 11:10:20 2025 +0530

    [GOBBLIN-2236] Remove dataset urn specific properties from CommitStats 
(#4151)
    
    * Remove dataset urn specific properties from CommitStats and 
ExecGobblinStats
---
 .../ddm/activity/impl/CommitActivityImpl.java      |  24 +++-
 .../gobblin/temporal/ddm/work/CommitStats.java     |   7 +-
 .../temporal/ddm/work/ExecGobblinStats.java        |   4 +-
 .../ddm/workflow/impl/CommitStepWorkflowImpl.java  |  21 ---
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |   8 +-
 .../ddm/activity/impl/CommitActivityImplTest.java  | 158 +++++++++++++++++++++
 .../gobblin/temporal/ddm/work/CommitStatsTest.java | 112 +++++++++++++++
 .../temporal/ddm/work/ExecGobblinStatsTest.java    | 120 ++++++++++++++++
 .../workflow/impl/CommitStepWorkflowImplTest.java  |  95 +++++++++++++
 9 files changed, 519 insertions(+), 30 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index ba44dc6198..10b045232c 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -58,6 +58,8 @@ import 
org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.initializer.Initializer;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.DatasetTaskSummary;
 import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.SafeDatasetCommit;
@@ -75,6 +77,7 @@ import org.apache.gobblin.temporal.ddm.work.DatasetStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -146,9 +149,19 @@ public class CommitActivityImpl implements CommitActivity {
       boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
 
       Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext, 
shouldIncludeFailedTasks);
+
+      // Emit Job Summary event
+      if (!optFailure.isPresent() || !datasetTaskSummaries.isEmpty()) {
+        TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinActivityFactory(workSpec.getEventSubmitterContext());
+        timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+            .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, 
convertDatasetStatsToTaskSummaries(datasetTaskSummaries))
+            .submit();// emit job summary info on both full and partial commit 
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
+      }
+
       return new CommitStats(
-          datasetTaskSummaries,
           
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
+          
datasetTaskSummaries.values().stream().mapToLong(DatasetStats::getRecordsWritten).sum(),
+          
datasetTaskSummaries.values().stream().mapToLong(DatasetStats::getBytesWritten).sum(),
           optFailure
       );
     } catch (Exception e) {
@@ -290,4 +303,13 @@ public class CommitActivityImpl implements CommitActivity {
   private static WorkUnitStream createEmptyWorkUnitStream() {
     return new BasicWorkUnitStream.Builder(Lists.newArrayList()).build();
   }
+
+  private List<DatasetTaskSummary> 
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
+    List<DatasetTaskSummary> datasetTaskSummaries = Lists.newArrayList();
+    for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
+      datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), 
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), 
entry.getValue().isSuccessfullyCommitted(), 
entry.getValue().getDataQualityCheckStatus()));
+    }
+    log.info("Converted dataset stats to task summaries: {}", 
datasetTaskSummaries);
+    return datasetTaskSummaries;
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
index 3c150689d4..936fc828c4 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.temporal.ddm.work;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Optional;
 
 import lombok.AccessLevel;
@@ -41,11 +39,12 @@ import 
org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class CommitStats {
-  @NonNull private Map<String, DatasetStats> datasetStats;
   @NonNull private int numCommittedWorkUnits;
+  @NonNull private long recordsWritten;
+  @NonNull private long bytesWritten;
   @NonNull private Optional<FailedDatasetUrnsException> optFailure;
 
   public static CommitStats createEmpty() {
-    return new CommitStats(new HashMap<>(), 0, Optional.empty());
+    return new CommitStats(0, 0, 0, Optional.empty());
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
index 89beebf153..a9cf4c1114 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.temporal.ddm.work;
 
-import java.util.Map;
 import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -34,7 +33,8 @@ import lombok.Setter;
 public class ExecGobblinStats {
   @NonNull private int numWorkUnits;
   @NonNull private int numCommitted;
+  @NonNull private long recordsWritten;
+  @NonNull private long bytesWritten;
   @NonNull private String user;
-  @NonNull private Map<String, DatasetStats> stats;
 }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 56f645fc49..50d587c4ef 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -17,9 +17,6 @@
 
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 import io.temporal.failure.ApplicationFailure;
@@ -27,15 +24,11 @@ import io.temporal.workflow.Workflow;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.DatasetTaskSummary;
 import org.apache.gobblin.temporal.ddm.activity.ActivityType;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
-import org.apache.gobblin.temporal.ddm.work.DatasetStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
-import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 
 
 @Slf4j
@@ -45,12 +38,6 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   public CommitStats commit(WUProcessingSpec workSpec, final Properties props) 
{
     final CommitActivity activityStub = 
Workflow.newActivityStub(CommitActivity.class, 
ActivityType.COMMIT.buildActivityOptions(props, true));
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
-    if (!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
-      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext(), 
props);
-      timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
-          .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, 
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
-          .submit();// emit job summary info on both full and partial commit 
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
-    }
     if (commitGobblinStats.getOptFailure().isPresent()) {
       throw ApplicationFailure.newNonRetryableFailureWithCause(
           String.format("Failed to commit dataset state for some dataset(s)"), 
commitGobblinStats.getOptFailure().get().getClass().toString(),
@@ -58,12 +45,4 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
     }
     return commitGobblinStats;
   }
-  private List<DatasetTaskSummary> 
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
-    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
-    for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
-      datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), 
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), 
entry.getValue().isSuccessfullyCommitted(), 
entry.getValue().getDataQualityCheckStatus()));
-    }
-    log.info("Converted dataset stats to task summaries: {}", 
datasetTaskSummaries);
-    return datasetTaskSummaries;
-  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index f1b4444d3b..05fa2a8546 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -102,6 +102,8 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       WorkUnitsSizeSummary wuSizeSummary = 
generateWorkUnitResult.getWorkUnitsSizeSummary();
       int numWUsGenerated = 
safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
       int numWUsCommitted = 0;
+      long recordsWritten = 0;
+      long bytesWritten = 0;
       CommitStats commitStats = CommitStats.createEmpty();
       if (numWUsGenerated > 0) {
         TimeBudget timeBudget = 
calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps);
@@ -124,11 +126,13 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
         ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
         commitStats = processWUsWorkflow.process(wuSpec, temporalJobProps);
         numWUsCommitted = commitStats.getNumCommittedWorkUnits();
+        recordsWritten = commitStats.getRecordsWritten();
+        bytesWritten = commitStats.getBytesWritten();
       }
       jobSuccessTimer.stop();
       isSuccessful = true;
-      return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, 
jobProps.getProperty(Help.USER_TO_PROXY_KEY),
-          commitStats.getDatasetStats());
+      return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, 
recordsWritten, bytesWritten,
+          jobProps.getProperty(Help.USER_TO_PROXY_KEY));
     } catch (Exception e) {
       // Emit a failed GobblinTrackingEvent to record job failures
       timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); // 
update GaaS: `ExecutionStatus.FAILED`; `TimingEvent.JOB_END_TIME`
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImplTest.java
new file mode 100644
index 0000000000..63d6aa1dc1
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImplTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.runtime.DatasetTaskSummary;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
+
+
+/**
+ * Tests for {@link CommitActivityImpl} to verify the job summary event 
emission
+ * and aggregate stats calculation.
+ */
+public class CommitActivityImplTest {
+
+  @Test
+  public void testConvertDatasetStatsToTaskSummaries() throws Exception {
+    CommitActivityImpl commitActivity = new CommitActivityImpl();
+
+    // Create test dataset stats
+    Map<String, DatasetStats> datasetStats = new HashMap<>();
+    datasetStats.put("dataset1", new DatasetStats(1000L, 2000L, true, 10, 
"PASS"));
+    datasetStats.put("dataset2", new DatasetStats(3000L, 4000L, true, 20, 
"PASS"));
+    datasetStats.put("dataset3", new DatasetStats(5000L, 6000L, false, 0, 
"FAIL"));
+
+    // Use reflection to call the private method
+    java.lang.reflect.Method method = 
CommitActivityImpl.class.getDeclaredMethod(
+        "convertDatasetStatsToTaskSummaries", Map.class);
+    method.setAccessible(true);
+
+    @SuppressWarnings("unchecked")
+    List<DatasetTaskSummary> result = (List<DatasetTaskSummary>) 
method.invoke(commitActivity, datasetStats);
+
+    // Verify the conversion
+    Assert.assertEquals(3, result.size());
+
+    // Verify each dataset was converted correctly
+    Map<String, DatasetTaskSummary> summaryMap = new HashMap<>();
+    for (DatasetTaskSummary summary : result) {
+      summaryMap.put(summary.getDatasetUrn(), summary);
+    }
+
+    Assert.assertTrue(summaryMap.containsKey("dataset1"));
+    Assert.assertEquals(1000L, summaryMap.get("dataset1").getRecordsWritten());
+    Assert.assertEquals(2000L, summaryMap.get("dataset1").getBytesWritten());
+    Assert.assertTrue(summaryMap.get("dataset1").isSuccessfullyCommitted());
+    Assert.assertEquals("PASS", 
summaryMap.get("dataset1").getDataQualityStatus());
+
+    Assert.assertTrue(summaryMap.containsKey("dataset2"));
+    Assert.assertEquals(3000L, summaryMap.get("dataset2").getRecordsWritten());
+    Assert.assertEquals(4000L, summaryMap.get("dataset2").getBytesWritten());
+    Assert.assertTrue(summaryMap.get("dataset2").isSuccessfullyCommitted());
+
+    Assert.assertTrue(summaryMap.containsKey("dataset3"));
+    Assert.assertEquals(5000L, summaryMap.get("dataset3").getRecordsWritten());
+    Assert.assertEquals(6000L, summaryMap.get("dataset3").getBytesWritten());
+    Assert.assertFalse(summaryMap.get("dataset3").isSuccessfullyCommitted());
+    Assert.assertEquals("FAIL", 
summaryMap.get("dataset3").getDataQualityStatus());
+  }
+
+  @Test
+  public void testConvertDatasetStatsToTaskSummariesWithEmptyMap() throws 
Exception {
+    CommitActivityImpl commitActivity = new CommitActivityImpl();
+
+    Map<String, DatasetStats> emptyDatasetStats = new HashMap<>();
+
+    // Use reflection to call the private method
+    java.lang.reflect.Method method = 
CommitActivityImpl.class.getDeclaredMethod(
+        "convertDatasetStatsToTaskSummaries", Map.class);
+    method.setAccessible(true);
+
+    @SuppressWarnings("unchecked")
+    List<DatasetTaskSummary> result = (List<DatasetTaskSummary>) 
method.invoke(commitActivity, emptyDatasetStats);
+
+    Assert.assertNotNull(result);
+    Assert.assertEquals(0, result.size());
+  }
+
+  @Test
+  public void testConvertDatasetStatsToTaskSummariesPreservesAllFields() 
throws Exception {
+    CommitActivityImpl commitActivity = new CommitActivityImpl();
+
+    // Create test dataset stats with specific values
+    Map<String, DatasetStats> datasetStats = new HashMap<>();
+    String datasetUrn = "testDataset";
+    long recordsWritten = 123456L;
+    long bytesWritten = 789012L;
+    boolean successfullyCommitted = true;
+    int numCommittedWorkunits = 42;
+    String dataQualityStatus = "PASS";
+
+    datasetStats.put(datasetUrn, new DatasetStats(
+        recordsWritten, bytesWritten, successfullyCommitted, 
numCommittedWorkunits, dataQualityStatus));
+
+    // Use reflection to call the private method
+    java.lang.reflect.Method method = 
CommitActivityImpl.class.getDeclaredMethod(
+        "convertDatasetStatsToTaskSummaries", Map.class);
+    method.setAccessible(true);
+    
+    @SuppressWarnings("unchecked")
+    List<DatasetTaskSummary> result = (List<DatasetTaskSummary>) 
method.invoke(commitActivity, datasetStats);
+
+    Assert.assertEquals(1, result.size());
+    DatasetTaskSummary summary = result.get(0);
+    
+    // Verify all fields are preserved
+    Assert.assertEquals(datasetUrn, summary.getDatasetUrn());
+    Assert.assertEquals(recordsWritten, summary.getRecordsWritten());
+    Assert.assertEquals(bytesWritten, summary.getBytesWritten());
+    Assert.assertEquals(successfullyCommitted, 
summary.isSuccessfullyCommitted());
+    Assert.assertEquals(dataQualityStatus, summary.getDataQualityStatus());
+  }
+
+  @Test
+  public void testAggregateStatsCalculation() {
+    // This test verifies the aggregate calculation logic used in 
CommitActivityImpl
+    Map<String, DatasetStats> datasetStats = new HashMap<>();
+    datasetStats.put("dataset1", new DatasetStats(1000L, 2000L, true, 10, 
"PASS"));
+    datasetStats.put("dataset2", new DatasetStats(3000L, 4000L, true, 20, 
"PASS"));
+    datasetStats.put("dataset3", new DatasetStats(5000L, 6000L, true, 30, 
"PASS"));
+
+    // Calculate aggregates as done in CommitActivityImpl
+    int totalCommittedWorkUnits = datasetStats.values().stream()
+        .mapToInt(DatasetStats::getNumCommittedWorkunits)
+        .sum();
+    long totalRecordsWritten = datasetStats.values().stream()
+        .mapToLong(DatasetStats::getRecordsWritten)
+        .sum();
+    long totalBytesWritten = datasetStats.values().stream()
+        .mapToLong(DatasetStats::getBytesWritten)
+        .sum();
+
+    Assert.assertEquals(60, totalCommittedWorkUnits);
+    Assert.assertEquals(9000L, totalRecordsWritten);
+    Assert.assertEquals(12000L, totalBytesWritten);
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/CommitStatsTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/CommitStatsTest.java
new file mode 100644
index 0000000000..0040637dab
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/CommitStatsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.util.Optional;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
+
+
+/**
+ * Tests for {@link CommitStats} to verify the structure no longer contains 
large dataset maps
+ * and only contains aggregate metrics to avoid Temporal payload size limits.
+ */
+public class CommitStatsTest {
+
+  @Test
+  public void testCommitStatsWithAggregateValues() {
+    int numCommittedWorkUnits = 100;
+    long recordsWritten = 50000L;
+    long bytesWritten = 1024000L;
+    Optional<FailedDatasetUrnsException> optFailure = Optional.empty();
+
+    CommitStats stats = new CommitStats(numCommittedWorkUnits, recordsWritten, 
bytesWritten, optFailure);
+
+    Assert.assertEquals(numCommittedWorkUnits, 
stats.getNumCommittedWorkUnits());
+    Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+    Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+    Assert.assertFalse(stats.getOptFailure().isPresent());
+  }
+
+  @Test
+  public void testCommitStatsWithFailure() {
+    int numCommittedWorkUnits = 50;
+    long recordsWritten = 25000L;
+    long bytesWritten = 512000L;
+    FailedDatasetUrnsException failure = new 
FailedDatasetUrnsException(java.util.Collections.singleton("dataset1"));
+    Optional<FailedDatasetUrnsException> optFailure = Optional.of(failure);
+
+    CommitStats stats = new CommitStats(numCommittedWorkUnits, recordsWritten, 
bytesWritten, optFailure);
+
+    Assert.assertEquals(numCommittedWorkUnits, 
stats.getNumCommittedWorkUnits());
+    Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+    Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+    Assert.assertTrue(stats.getOptFailure().isPresent());
+    Assert.assertEquals(failure, stats.getOptFailure().get());
+  }
+
+  @Test
+  public void testCreateEmpty() {
+    CommitStats emptyStats = CommitStats.createEmpty();
+
+    Assert.assertEquals(0, emptyStats.getNumCommittedWorkUnits());
+    Assert.assertEquals(0L, emptyStats.getRecordsWritten());
+    Assert.assertEquals(0L, emptyStats.getBytesWritten());
+    Assert.assertFalse(emptyStats.getOptFailure().isPresent());
+  }
+
+  @Test
+  public void testCommitStatsDoesNotContainDatasetStatsMap() {
+    // Verify that CommitStats no longer has a datasetStats field
+    // This is important to avoid exceeding Temporal's 2MB payload size limit
+    CommitStats stats = new CommitStats(10, 1000L, 2000L, Optional.empty());
+    
+    // Verify only the expected fields exist
+    Assert.assertEquals(10, stats.getNumCommittedWorkUnits());
+    Assert.assertEquals(1000L, stats.getRecordsWritten());
+    Assert.assertEquals(2000L, stats.getBytesWritten());
+    Assert.assertFalse(stats.getOptFailure().isPresent());
+  }
+
+  @Test
+  public void testCommitStatsWithZeroValues() {
+    CommitStats stats = new CommitStats(0, 0L, 0L, Optional.empty());
+
+    Assert.assertEquals(0, stats.getNumCommittedWorkUnits());
+    Assert.assertEquals(0L, stats.getRecordsWritten());
+    Assert.assertEquals(0L, stats.getBytesWritten());
+    Assert.assertFalse(stats.getOptFailure().isPresent());
+  }
+
+  @Test
+  public void testCommitStatsWithLargeValues() {
+    // Test with large values to ensure no overflow issues
+    int numCommittedWorkUnits = Integer.MAX_VALUE;
+    long recordsWritten = Long.MAX_VALUE / 2;
+    long bytesWritten = Long.MAX_VALUE / 2;
+
+    CommitStats stats = new CommitStats(numCommittedWorkUnits, recordsWritten, 
bytesWritten, Optional.empty());
+
+    Assert.assertEquals(numCommittedWorkUnits, 
stats.getNumCommittedWorkUnits());
+    Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+    Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStatsTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStatsTest.java
new file mode 100644
index 0000000000..09895dedf5
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStatsTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link ExecGobblinStats} to verify the structure no longer 
contains large dataset maps
+ * and only contains aggregate metrics to avoid Temporal payload size limits.
+ */
+public class ExecGobblinStatsTest {
+
+  @Test
+  public void testExecGobblinStatsWithAggregateValues() {
+    int numWorkUnits = 200;
+    int numCommitted = 195;
+    long recordsWritten = 100000L;
+    long bytesWritten = 2048000L;
+    String user = "testUser";
+
+    ExecGobblinStats stats = new ExecGobblinStats(numWorkUnits, numCommitted, 
recordsWritten, bytesWritten, user);
+
+    Assert.assertEquals(numWorkUnits, stats.getNumWorkUnits());
+    Assert.assertEquals(numCommitted, stats.getNumCommitted());
+    Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+    Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+    Assert.assertEquals(user, stats.getUser());
+  }
+
+  @Test
+  public void testExecGobblinStatsDoesNotContainDatasetStatsMap() {
+    // Verify that ExecGobblinStats no longer has a stats (Map<String, 
DatasetStats>) field
+    // This is important to avoid exceeding Temporal's 2MB payload size limit
+    ExecGobblinStats stats = new ExecGobblinStats(10, 10, 1000L, 2000L, 
"user1");
+    
+    // Verify only the expected fields exist
+    Assert.assertEquals(10, stats.getNumWorkUnits());
+    Assert.assertEquals(10, stats.getNumCommitted());
+    Assert.assertEquals(1000L, stats.getRecordsWritten());
+    Assert.assertEquals(2000L, stats.getBytesWritten());
+    Assert.assertEquals("user1", stats.getUser());
+  }
+
+  @Test
+  public void testExecGobblinStatsWithPartialCommit() {
+    // Test scenario where not all work units were committed
+    int numWorkUnits = 100;
+    int numCommitted = 75;
+    long recordsWritten = 50000L;
+    long bytesWritten = 1024000L;
+    String user = "partialUser";
+
+    ExecGobblinStats stats = new ExecGobblinStats(numWorkUnits, numCommitted, 
recordsWritten, bytesWritten, user);
+
+    Assert.assertEquals(numWorkUnits, stats.getNumWorkUnits());
+    Assert.assertEquals(numCommitted, stats.getNumCommitted());
+    Assert.assertTrue(stats.getNumCommitted() < stats.getNumWorkUnits());
+    Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+    Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+  }
+
+  @Test
+  public void testExecGobblinStatsWithZeroValues() {
+    ExecGobblinStats stats = new ExecGobblinStats(0, 0, 0L, 0L, "emptyUser");
+
+    Assert.assertEquals(0, stats.getNumWorkUnits());
+    Assert.assertEquals(0, stats.getNumCommitted());
+    Assert.assertEquals(0L, stats.getRecordsWritten());
+    Assert.assertEquals(0L, stats.getBytesWritten());
+    Assert.assertEquals("emptyUser", stats.getUser());
+  }
+
+  @Test
+  public void testExecGobblinStatsWithLargeValues() {
+    // Test with large values to ensure no overflow issues
+    int numWorkUnits = Integer.MAX_VALUE;
+    int numCommitted = Integer.MAX_VALUE - 1;
+    long recordsWritten = Long.MAX_VALUE / 2;
+    long bytesWritten = Long.MAX_VALUE / 2;
+    String user = "largeUser";
+
+    ExecGobblinStats stats = new ExecGobblinStats(numWorkUnits, numCommitted, 
recordsWritten, bytesWritten, user);
+
+    Assert.assertEquals(numWorkUnits, stats.getNumWorkUnits());
+    Assert.assertEquals(numCommitted, stats.getNumCommitted());
+    Assert.assertEquals(recordsWritten, stats.getRecordsWritten());
+    Assert.assertEquals(bytesWritten, stats.getBytesWritten());
+    Assert.assertEquals(user, stats.getUser());
+  }
+
+  @Test
+  public void testExecGobblinStatsConstructorFieldOrder() {
+    // Verify the constructor field order matches the new structure
+    ExecGobblinStats stats = new ExecGobblinStats(100, 95, 50000L, 1024000L, 
"orderUser");
+
+    // Verify all fields are set correctly in the expected order
+    Assert.assertEquals(100, stats.getNumWorkUnits());
+    Assert.assertEquals(95, stats.getNumCommitted());
+    Assert.assertEquals(50000L, stats.getRecordsWritten());
+    Assert.assertEquals(1024000L, stats.getBytesWritten());
+    Assert.assertEquals("orderUser", stats.getUser());
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImplTest.java
new file mode 100644
index 0000000000..5549749bcb
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImplTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.lang.reflect.Method;
+import java.util.Optional;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+
+
+/**
+ * Tests for {@link CommitStepWorkflowImpl} to verify that job summary event 
emission
+ * has been moved to CommitActivity to avoid Temporal payload size limits.
+ */
+public class CommitStepWorkflowImplTest {
+
+  @Test
+  public void testCommitStepWorkflowDoesNotConvertDatasetStats() throws 
Exception {
+    // Verify that CommitStepWorkflowImpl no longer has the 
convertDatasetStatsToTaskSummaries method
+    // This method should only exist in CommitActivityImpl now
+
+    CommitStepWorkflowImpl workflow = new CommitStepWorkflowImpl();
+    Method[] methods = CommitStepWorkflowImpl.class.getDeclaredMethods();
+
+    boolean hasConvertMethod = false;
+    for (Method method : methods) {
+      if (method.getName().equals("convertDatasetStatsToTaskSummaries")) {
+        hasConvertMethod = true;
+        break;
+      }
+    }
+
+    Assert.assertFalse(hasConvertMethod,
+        "CommitStepWorkflowImpl should not have 
convertDatasetStatsToTaskSummaries method. " +
+        "Job summary emission should be done in CommitActivity to avoid 
payload size issues.");
+  }
+
+  @Test
+  public void testCommitStepWorkflowSimplifiedStructure() {
+    // Verify that CommitStepWorkflowImpl has a simplified structure
+    // It should only have the commit method and no job summary emission logic
+
+    CommitStepWorkflowImpl workflow = new CommitStepWorkflowImpl();
+    Method[] methods = CommitStepWorkflowImpl.class.getDeclaredMethods();
+
+    // Count non-inherited methods (excluding synthetic methods)
+    int methodCount = 0;
+    for (Method method : methods) {
+      if (!method.isSynthetic()) {
+        methodCount++;
+      }
+    }
+
+    // Should only have the commit method
+    Assert.assertTrue(methodCount <= 1,
+        "CommitStepWorkflowImpl should have minimal methods. Found: " + 
methodCount);
+  }
+
+  @Test
+  public void testCommitStatsStructureForPayloadSizeOptimization() {
+    // Verify that CommitStats returned by the workflow has aggregate values 
only
+    // This ensures the payload size stays under Temporal's 2MB limit
+
+    CommitStats stats = new CommitStats(100, 50000L, 1024000L, 
Optional.empty());
+
+    // Verify it has aggregate values
+    Assert.assertEquals(100, stats.getNumCommittedWorkUnits());
+    Assert.assertEquals(50000L, stats.getRecordsWritten());
+    Assert.assertEquals(1024000L, stats.getBytesWritten());
+
+    // Verify the object is small (should be under 1KB for aggregate values)
+    // This is a proxy test to ensure we're not serializing large maps
+    String statsString = stats.toString();
+    Assert.assertTrue(statsString.length() < 1000,
+        "CommitStats toString should be small. Length: " + 
statsString.length());
+  }
+}

Reply via email to