This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a00b57cda [GOBBLIN-1836] Ensuring Task Reliability: Handling Job
Cancellation and Graceful Exits for Error-Free Completion (#3699)
a00b57cda is described below
commit a00b57cda36acf9e30f9856bdc69383528f70486
Author: Zihan Li <[email protected]>
AuthorDate: Fri Jun 2 13:53:58 2023 -0700
[GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and
Graceful Exits for Error-Free Completion (#3699)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1836] Ensuring Task Reliability: Handling Job Cancellation and
Graceful Exits for Error-Free Completion
* add unit test
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../apache/gobblin/cluster/GobblinHelixTask.java | 10 ++++++++++
.../gobblin/cluster/GobblinHelixTaskTest.java | 23 ++++++++++++++++++++++
.../iceberg/writer/IcebergMetadataWriter.java | 2 ++
3 files changed, 35 insertions(+)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 11a60959a..3e7d2707e 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -90,6 +90,7 @@ public class GobblinHelixTask implements Task {
private SingleTask task;
private String helixTaskId;
private EventBus eventBus;
+ private boolean isCanceled;
public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
TaskCallbackContext taskCallbackContext,
@@ -161,12 +162,20 @@ public class GobblinHelixTask implements Task {
@Override
public TaskResult run() {
this.taskMetrics.helixTaskTotalRunning.incrementAndGet();
+ this.isCanceled = false;
long startTime = System.currentTimeMillis();
log.info("Actual task {} started. [{} {}]", this.taskId,
this.applicationName, this.instanceName);
try (Closer closer = Closer.create()) {
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY,
this.jobName));
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY,
this.jobKey));
this.task.run();
+ // Since we enable gracefully cancel, when task get cancelled, we might
not see any exception,
+ // so we check the isCanceled flag to make sure we return the correct
task status
+ if (this.isCanceled) {
+ log.error("Actual task {} canceled.", this.taskId);
+ this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
+ return new TaskResult(TaskResult.Status.CANCELED, "");
+ }
log.info("Actual task {} completed.", this.taskId);
this.taskMetrics.helixTaskTotalCompleted.incrementAndGet();
return new TaskResult(TaskResult.Status.COMPLETED, "");
@@ -219,6 +228,7 @@ public class GobblinHelixTask implements Task {
log.info("Gobblin helix task cancellation invoked for jobId {}.", jobId);
if (this.task != null ) {
try {
+ this.isCanceled = true;
this.task.cancel();
log.info("Gobblin helix task cancellation completed for jobId {}.",
jobId);
} catch (Throwable t) {
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index 8cdbbef75..e7a8295dc 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -94,6 +94,8 @@ public class GobblinHelixTaskTest {
private GobblinHelixTask gobblinHelixTask;
+ private GobblinHelixTask gobblinHelixTaskForCancel;
+
private HelixManager helixManager;
private FileSystem localFs;
@@ -194,6 +196,8 @@ public class GobblinHelixTaskTest {
// Expect to go through.
this.gobblinHelixTask = (GobblinHelixTask)
gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
+ this.gobblinHelixTaskForCancel = (GobblinHelixTask)
gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
+
// Mock the method getFs() which get called in SingleTask constructor, so
that SingleTask could fail and trigger retry,
// which would also fail eventually with timeout.
TaskRunnerSuiteBase.Builder builderSpy = Mockito.spy(builder);
@@ -256,6 +260,25 @@ public class GobblinHelixTaskTest {
TestHelper.assertGenericRecords(outputAvroFile, schema);
}
+ @Test(dependsOnMethods = "testRun")
+ public void testCancel() throws IOException, InterruptedException {
+
+ final TaskResult[] taskResult = new TaskResult[1];
+ Thread thread = new Thread(){
+ @Override
+ public void run() {
+ taskResult[0] = gobblinHelixTaskForCancel.run();
+ }
+ };
+ thread.start();
+ Thread.sleep(3);
+ gobblinHelixTaskForCancel.cancel();
+ thread.join();
+ System.out.println(taskResult[0].getInfo());
+ //We can see task failure or task cancelled as task status
+ Assert.assertNotEquals(taskResult[0].getStatus(),
TaskResult.Status.COMPLETED);
+ }
+
@AfterClass
public void tearDown() throws IOException {
try {
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 4cfb0bea0..9c3d30e9e 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -521,6 +521,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
try (Timer.Context context =
metricContext.timer(CREATE_TABLE_TIME).time()) {
icebergTable =
catalog.createTable(tid, tableSchema, partitionSpec, tableLocation,
IcebergUtils.getTableProperties(table));
+ // We should set the avro schema literal when creating the table.
+
icebergTable.updateProperties().set(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
schema).commit();
log.info("Created table {}, schema: {} partition spec: {}", tid,
tableSchema, partitionSpec);
} catch (AlreadyExistsException e) {
log.warn("table {} already exist, there may be some other process try to
create table concurrently", tid);