This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a00b57cda [GOBBLIN-1836] Ensuring Task Reliability: Handling Job 
Cancellation and Graceful Exits for Error-Free Completion (#3699)
a00b57cda is described below

commit a00b57cda36acf9e30f9856bdc69383528f70486
Author: Zihan Li <[email protected]>
AuthorDate: Fri Jun 2 13:53:58 2023 -0700

    [GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and 
Graceful Exits for Error-Free Completion (#3699)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and 
Graceful Exits for Error-Free Completion
    
    * add unit test
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../apache/gobblin/cluster/GobblinHelixTask.java   | 10 ++++++++++
 .../gobblin/cluster/GobblinHelixTaskTest.java      | 23 ++++++++++++++++++++++
 .../iceberg/writer/IcebergMetadataWriter.java      |  2 ++
 3 files changed, 35 insertions(+)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 11a60959a..3e7d2707e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -90,6 +90,7 @@ public class GobblinHelixTask implements Task {
   private SingleTask task;
   private String helixTaskId;
   private EventBus eventBus;
+  private boolean isCanceled;
 
   public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                           TaskCallbackContext taskCallbackContext,
@@ -161,12 +162,20 @@ public class GobblinHelixTask implements Task {
   @Override
   public TaskResult run() {
     this.taskMetrics.helixTaskTotalRunning.incrementAndGet();
+    this.isCanceled = false;
     long startTime = System.currentTimeMillis();
     log.info("Actual task {} started. [{} {}]", this.taskId, 
this.applicationName, this.instanceName);
     try (Closer closer = Closer.create()) {
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, 
this.jobName));
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, 
this.jobKey));
       this.task.run();
+      // Since we enable gracefully cancel, when task get cancelled, we might 
not see any exception,
+      // so we check the isCanceled flag to make sure we return the correct 
task status
+      if (this.isCanceled) {
+        log.error("Actual task {} canceled.", this.taskId);
+        this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
+        return new TaskResult(TaskResult.Status.CANCELED, "");
+      }
       log.info("Actual task {} completed.", this.taskId);
       this.taskMetrics.helixTaskTotalCompleted.incrementAndGet();
       return new TaskResult(TaskResult.Status.COMPLETED, "");
@@ -219,6 +228,7 @@ public class GobblinHelixTask implements Task {
     log.info("Gobblin helix task cancellation invoked for jobId {}.", jobId);
     if (this.task != null ) {
       try {
+        this.isCanceled = true;
         this.task.cancel();
         log.info("Gobblin helix task cancellation completed for jobId {}.", 
jobId);
       } catch (Throwable t) {
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index 8cdbbef75..e7a8295dc 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -94,6 +94,8 @@ public class GobblinHelixTaskTest {
 
   private GobblinHelixTask gobblinHelixTask;
 
+  private GobblinHelixTask gobblinHelixTaskForCancel;
+
   private HelixManager helixManager;
 
   private FileSystem localFs;
@@ -194,6 +196,8 @@ public class GobblinHelixTaskTest {
     // Expect to go through.
     this.gobblinHelixTask = (GobblinHelixTask) 
gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
 
+    this.gobblinHelixTaskForCancel = (GobblinHelixTask) 
gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
+
     // Mock the method getFs() which get called in SingleTask constructor, so 
that SingleTask could fail and trigger retry,
     // which would also fail eventually with timeout.
     TaskRunnerSuiteBase.Builder builderSpy = Mockito.spy(builder);
@@ -256,6 +260,25 @@ public class GobblinHelixTaskTest {
     TestHelper.assertGenericRecords(outputAvroFile, schema);
   }
 
+  @Test(dependsOnMethods = "testRun")
+  public void testCancel() throws IOException, InterruptedException {
+
+    final TaskResult[] taskResult = new TaskResult[1];
+    Thread thread = new Thread(){
+      @Override
+      public void run() {
+        taskResult[0] = gobblinHelixTaskForCancel.run();
+      }
+    };
+    thread.start();
+    Thread.sleep(3);
+    gobblinHelixTaskForCancel.cancel();
+    thread.join();
+    System.out.println(taskResult[0].getInfo());
+    //We can see task failure or task cancelled as task status
+    Assert.assertNotEquals(taskResult[0].getStatus(), 
TaskResult.Status.COMPLETED);
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     try {
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 4cfb0bea0..9c3d30e9e 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -521,6 +521,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     try (Timer.Context context = 
metricContext.timer(CREATE_TABLE_TIME).time()) {
       icebergTable =
           catalog.createTable(tid, tableSchema, partitionSpec, tableLocation, 
IcebergUtils.getTableProperties(table));
+      // We should set the avro schema literal when creating the table.
+      
icebergTable.updateProperties().set(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
 schema).commit();
       log.info("Created table {}, schema: {} partition spec: {}", tid, 
tableSchema, partitionSpec);
     } catch (AlreadyExistsException e) {
       log.warn("table {} already exist, there may be some other process try to 
create table concurrently", tid);

Reply via email to