This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 920a8af46a4 [HUDI-9041] Send commit ack event when reusing current 
instant (#12886)
920a8af46a4 is described below

commit 920a8af46a4d4182257cd2594ab50b8564258363
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Feb 28 07:33:03 2025 +0800

    [HUDI-9041] Send commit ack event when reusing current instant (#12886)
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  3 ++
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 24 +++++++++
 .../utils/BucketStreamWriteFunctionWrapper.java    |  6 +++
 .../hudi/sink/utils/InsertFunctionWrapper.java     | 15 ++++++
 .../apache/hudi/sink/utils/MockSubtaskGateway.java | 57 ++++++++++++++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |  6 +++
 .../hudi/sink/utils/TestFunctionWrapper.java       | 15 +++++-
 .../org/apache/hudi/sink/utils/TestWriteBase.java  | 13 +++++
 .../apache/hudi/adapter/ExecutionAttemptUtil.java  | 31 ++++++++++++
 .../apache/hudi/adapter/ExecutionAttemptUtil.java  | 31 ++++++++++++
 .../apache/hudi/adapter/ExecutionAttemptUtil.java  | 31 ++++++++++++
 .../apache/hudi/adapter/ExecutionAttemptUtil.java  | 31 ++++++++++++
 .../apache/hudi/adapter/ExecutionAttemptUtil.java  | 31 ++++++++++++
 13 files changed, 293 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e96e4f6524f..13b9fd5247a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -445,6 +445,9 @@ public class StreamWriteOperatorCoordinator
         LOG.warn("Reuse current pending Instant {} with {} operationType, "
                 + "ignoring empty bootstrap event.", this.instant, 
WriteOperationType.INSERT.value());
         reset();
+
+        // send commit act event to unblock write tasks
+        sendCommitAckEvents(-1L);
         return;
       }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 83ca930b61d..bd6da22965d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -144,6 +144,30 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .end();
   }
 
+  @Test
+  public void testAppendInsertAfterFailoverWithEmptyCheckpoint() throws 
Exception {
+    // open the function and ingest data
+    conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10_000L);
+    conf.setString(FlinkOptions.OPERATION, "INSERT");
+    preparePipeline()
+        .assertEmptyDataFiles()
+        // make an empty snapshot
+        .checkpoint(1)
+        .assertEmptyEvent()
+        // trigger a partial failure
+        .subTaskFails(0, 1)
+        .assertNextEvent()
+        // make sure coordinator send an ack event to unblock the writers.
+        .assertNextSubTaskEvent()
+        // write a set of data and check the result.
+        .consume(TestData.DATA_SET_INSERT)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .checkWrittenData(EXPECTED1)
+        .end();
+  }
+
   // Only when Job level fails with INSERT operationType can we roll back the 
unfinished instant.
   // Task level failed retry, we should reuse the unfinished Instant with 
INSERT operationType
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index df648df9ac0..acd0375dbb3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -182,6 +183,11 @@ public class BucketStreamWriteFunctionWrapper<I> 
implements TestFunctionWrapper<
     return coordinator;
   }
 
+  @Override
+  public AbstractWriteFunction getWriteFunction() {
+    return this.writeFunction;
+  }
+
   public MockOperatorCoordinatorContext getCoordinatorContext() {
     return coordinatorContext;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 15634cc6e72..4fd8df1fe75 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -23,6 +23,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.append.AppendWriteFunction;
 import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
@@ -58,6 +59,7 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
 
   private final MockStreamingRuntimeContext runtimeContext;
   private final MockOperatorEventGateway gateway;
+  private final MockSubtaskGateway subtaskGateway;
   private final MockOperatorCoordinatorContext coordinatorContext;
   private StreamWriteOperatorCoordinator coordinator;
   private final MockStateInitializationContext stateInitializationContext;
@@ -79,6 +81,7 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
         .build();
     this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
environment);
     this.gateway = new MockOperatorEventGateway();
+    this.subtaskGateway = new MockSubtaskGateway();
     this.conf = conf;
     this.rowType = (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
     // one function
@@ -121,6 +124,11 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     return this.gateway.getNextEvent();
   }
 
+  @Override
+  public OperatorEvent getNextSubTaskEvent() {
+    return this.subtaskGateway.getNextEvent();
+  }
+
   public void checkpointFunction(long checkpointId) throws Exception {
     // checkpoint the coordinator first
     this.coordinator.checkpointCoordinator(checkpointId, new 
CompletableFuture<>());
@@ -174,6 +182,11 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     return coordinator;
   }
 
+  @Override
+  public AbstractWriteFunction getWriteFunction() {
+    return this.writeFunction;
+  }
+
   @Override
   public void close() throws Exception {
     this.coordinator.close();
@@ -196,5 +209,7 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     writeFunction.setOperatorEventGateway(gateway);
     writeFunction.initializeState(this.stateInitializationContext);
     writeFunction.open(conf);
+    // set up subtask gateway
+    coordinator.subtaskReady(0, subtaskGateway);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockSubtaskGateway.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockSubtaskGateway.java
new file mode 100644
index 00000000000..da4a31b4fb8
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockSubtaskGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.adapter.ExecutionAttemptUtil;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A mock {@link OperatorCoordinator.SubtaskGateway} for unit tests.
+ */
+public class MockSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+  private final LinkedList<OperatorEvent> events = new LinkedList<>();
+
+  @Override
+  public CompletableFuture<Acknowledge> sendEvent(OperatorEvent operatorEvent) 
{
+    events.add(operatorEvent);
+    return CompletableFuture.completedFuture(Acknowledge.get());
+  }
+
+  @Override
+  public ExecutionAttemptID getExecution() {
+    return ExecutionAttemptUtil.randomId();
+  }
+
+  @Override
+  public int getSubtask() {
+    return 0;
+  }
+
+  public OperatorEvent getNextEvent() {
+    return this.events.isEmpty() ? null : this.events.removeFirst();
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index c65e42f1521..9ef775d14b6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -259,6 +260,11 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     return coordinator;
   }
 
+  @Override
+  public AbstractWriteFunction getWriteFunction() {
+    return this.writeFunction;
+  }
+
   public MockOperatorCoordinatorContext getCoordinatorContext() {
     return coordinatorContext;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index faee168bf25..85ac461daf3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.utils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
@@ -49,10 +50,17 @@ public interface TestFunctionWrapper<I> {
   WriteMetadataEvent[] getEventBuffer();
 
   /**
-   * Returns the next event.
+   * Returns the next event sent to Coordinator.
    */
   OperatorEvent getNextEvent();
 
+  /**
+   * Returns the next event sent to subtask.
+   */
+  default OperatorEvent getNextSubTaskEvent() {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Snapshot all the functions in the wrapper.
    */
@@ -95,6 +103,11 @@ public interface TestFunctionWrapper<I> {
    */
   StreamWriteOperatorCoordinator getCoordinator();
 
+  /**
+   * Returns the write function.
+   */
+  AbstractWriteFunction getWriteFunction();
+
   /**
    * Returns the data buffer of the write task.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index a0d769c9983..76ad7dcba5c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.storage.HoodieStorage;
@@ -172,6 +173,18 @@ public class TestWriteBase {
       return this;
     }
 
+    /**
+     * Assert the next event exists and handle over it to the coordinator.
+     */
+    public TestHarness assertNextSubTaskEvent() {
+      final OperatorEvent nextEvent = this.pipeline.getNextSubTaskEvent();
+      if (nextEvent != null) {
+        MatcherAssert.assertThat("The Coordinator expect to send an event", 
nextEvent, instanceOf(CommitAckEvent.class));
+        this.pipeline.getWriteFunction().handleOperatorEvent(nextEvent);
+      }
+      return this;
+    }
+
     /**
      * Assert the next event exists and handle over it to the coordinator.
      */
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..19556dc5e48
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+  public static ExecutionAttemptID randomId() {
+    return new ExecutionAttemptID();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..19556dc5e48
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+  public static ExecutionAttemptID randomId() {
+    return new ExecutionAttemptID();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..866e1ca8c6a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+  public static ExecutionAttemptID randomId() {
+    return ExecutionAttemptID.randomId();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..866e1ca8c6a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+  public static ExecutionAttemptID randomId() {
+    return ExecutionAttemptID.randomId();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..866e1ca8c6a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+  public static ExecutionAttemptID randomId() {
+    return ExecutionAttemptID.randomId();
+  }
+}

Reply via email to